Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: writing DataFrame as compressed JSON

Tags:

Apache Spark's DataFrameReader.json() can handle gzipped JSONlines files automatically but there doesn't seem to be a way to get DataFrameWriter.json() to write compressed JSONlines files. The extra network I/O is very expensive in the cloud.

Is there a way around this problem?

like image 495
Sim Avatar asked Aug 11 '15 04:08

Sim


2 Answers

With Spark 2.X (and maybe earlier, I did not test) there is a simpler way to write a compressed JSON, which does not require changing the configuration:

val df: DataFrame = ... df.write.option("compression", "gzip").json("/foo/bar") 

This also works for CSV and for Parquet, just use .csv() and .parquet() instead of .json() to write the file after setting the compression option.

The possible codecs are: none, bzip2, deflate, gzip, lz4 and snappy.

like image 118
nsantos Avatar answered Sep 27 '22 19:09

nsantos


The following solutions use pyspark, but I assume the code in Scala would be similar.

First option is to set the following when you initialise your SparkConf:

conf = SparkConf() conf.set("spark.hadoop.mapred.output.compress", "true") conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK") 

With the code above any file you produce using that sparkContext is automatically compressed using gzip.

Second option, if you want to compress only selected files within your context. Lets say "df" is your dataframe and filename your destination:

df_rdd = self.df.toJSON()  df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec") 
like image 25
giorgioca Avatar answered Sep 27 '22 18:09

giorgioca