I'm trying to create a Spark RDD from several json files compressed into a tar. For example, I have 3 files
file1.json
file2.json
file3.json
And these are contained in archive.tar.gz
.
I want to create a dataframe from the json files. The problem is that Spark is not reading in the json files correctly. Creating an RDD using sqlContext.read.json("archive.tar.gz")
or sc.textFile("archive.tar.gz")
results in garbled/extra output.
Is there some way to handle gzipped archives containing multiple files in Spark?
UPDATE
Using the method given in the answer to Read whole text files from a compression in Spark I was able to get things running, but this method does not seem to be suitable for large tar.gz archives (>200 mb compressed) as the application chokes up on large archive sizes. As some of the archives I'm dealing with reach sizes upto 2 GB after compression I'm wondering if there is some efficient way to deal with the problem.
I'm trying to avoid extracting the archives and then merging the files together as this would be time consuming.
In Unix and Unix-like operating systems (such as Linux), you can use the tar command (short for "tape archiving") to combine multiple files into a single archive file for easy storage and/or distribution.
To add one or more archives to the end of another archive, you should use the ' --concatenate ' (' --catenate ', ' -A ') operation. To use ' --concatenate ', give the first archive with ' --file ' option and name the rest of archives to be concatenated on the command line.
Spark supports all compression formats that are supported by Hadoop.
The tar command is short for tape archive in Linux. This command is used for creating Archive and extracting the archive files.
A solution is given in Read whole text files from a compression in Spark .
Using the code sample provided, I was able to create a DataFrame
from the compressed archive like so:
val jsonRDD = sc.binaryFiles("gzarchive/*").
flatMapValues(x => extractFiles(x).toOption).
mapValues(_.map(decode())
val df = sqlContext.read.json(jsonRDD.map(_._2).flatMap(x => x))
This method works fine for tar archives of a relatively small size, but is not suitable for large archive sizes.
A better solution to the problem seems to be to convert the tar archives to Hadoop SequenceFiles
, which are splittable and hence can be read and processed in parallel in Spark (as opposed to tar archives.)
See: A Million Little Files – Digital Digressions by Stuart Sierra.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With