Apache Spark's DataFrameReader.json()
can handle gzipped JSONlines files automatically but there doesn't seem to be a way to get DataFrameWriter.json()
to write compressed JSONlines files. The extra network I/O is very expensive in the cloud.
Is there a way around this problem?
With Spark 2.X (and maybe earlier, I did not test) there is a simpler way to write a compressed JSON, which does not require changing the configuration:
val df: DataFrame = ... df.write.option("compression", "gzip").json("/foo/bar")
This also works for CSV and for Parquet, just use .csv() and .parquet() instead of .json() to write the file after setting the compression option.
The possible codecs are: none, bzip2, deflate, gzip, lz4 and snappy.
The following solutions use pyspark, but I assume the code in Scala would be similar.
First option is to set the following when you initialise your SparkConf:
conf = SparkConf() conf.set("spark.hadoop.mapred.output.compress", "true") conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
With the code above any file you produce using that sparkContext is automatically compressed using gzip.
Second option, if you want to compress only selected files within your context. Lets say "df" is your dataframe and filename your destination:
df_rdd = self.df.toJSON() df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With