pyspark.RDD.saveAsNewAPIHadoopDataset

RDD.saveAsNewAPIHadoopDataset(conf: Dict[str, str], keyConverter: Optional[str] = None, valueConverter: Optional[str] = None) → None[source]

Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are converted for output using either user specified converters or, by default, “org.apache.spark.api.python.JavaToWritableConverter”.

New in version 1.1.0.

Parameters
confdict

Hadoop job configuration

keyConverterstr, optional

fully qualified classname of key converter (None by default)

valueConverterstr, optional

fully qualified classname of value converter (None by default)

Examples

>>> import os
>>> import tempfile

Set the related classes

>>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
>>> input_format_class = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d:
...     path = os.path.join(d, "new_hadoop_file")
...
...     # Create the conf for writing
...     write_conf = {
...         "mapreduce.job.outputformat.class": (output_format_class),
...         "mapreduce.job.output.key.class": key_class,
...         "mapreduce.job.output.value.class": value_class,
...         "mapreduce.output.fileoutputformat.outputdir": path,
...     }
...
...     # Write a temporary Hadoop file
...     rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
...     rdd.saveAsNewAPIHadoopDataset(conf=write_conf)
...
...     # Create the conf for reading
...     read_conf = {"mapreduce.input.fileinputformat.inputdir": path}
...
...     # Load this Hadoop file as an RDD
...     loaded = sc.newAPIHadoopRDD(input_format_class,
...         key_class, value_class, conf=read_conf)
...     sorted(loaded.collect())
[(1, ''), (1, 'a'), (3, 'x')]