Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read multiple gzipped files from S3 into a single RDD?

I have many gzipped files stored on S3 which are organized by project and hour per day, the pattern of the paths of the files is as:

s3://<bucket>/project1/20141201/logtype1/logtype1.0000.gz
s3://<bucket>/project1/20141201/logtype1/logtype1.0100.gz
....
s3://<bucket>/project1/20141201/logtype1/logtype1.2300.gz

Since the data should be analyzed on a daily basis, I have to download and decompress the files belongs to a specific day, then assemble the content as a single RDD.

There should be several ways can do this, but I would like to know the best practice for Spark.

Thanks in advance.

like image 268
shihpeng Avatar asked Dec 15 '14 05:12

shihpeng


People also ask

Can you concatenate Gzipped files?

Files compressed by gzip can be directly concatenated into larger gzipped files.

Is gzip splittable?

Snappy and GZip blocks are not splittable, but files with Snappy blocks inside a container file format such as SequenceFile or Avro can be split.

Does S3 have gzip?

Even though Amazon S3 has most of the features of a full-fledged web server, it lacks transparently supporting GZIP. In another way, you have to manually compress the files using GZIP and setup the Content-encoding header to GZIP.

Can Spark read .GZ files?

Spark supports text files, SequenceFiles, and any other Hadoop InputFormat. In Spark, support for gzip input files should work the same as it does in Hadoop.


2 Answers

The underlying Hadoop API that Spark uses to access S3 allows you specify input files using a glob expression.

From the Spark docs:

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

So in your case you should be able to open all those files as a single RDD using something like this:

rdd = sc.textFile("s3://bucket/project1/20141201/logtype1/logtype1.*.gz")

Just for the record, you can also specify files using a comma-delimited list, and you can even mix that with the * and ? wildcards.

For example:

rdd = sc.textFile("s3://bucket/201412??/*/*.gz,s3://bucket/random-file.txt")

Briefly, what this does is:

  • The * matches all strings, so in this case all gz files in all folders under 201412?? will be loaded.
  • The ? matches a single character, so 201412?? will cover all days in December 2014 like 20141201, 20141202, and so forth.
  • The , lets you just load separate files at once into the same RDD, like the random-file.txt in this case.

Some notes about the appropriate URL scheme for S3 paths:

  • If you're running Spark on EMR, the correct URL scheme is s3://.
  • If you're running open-source Spark (i.e. no proprietary Amazon libraries) built on Hadoop 2.7 or newer, s3a:// is the way to go.
  • s3n:// has been deprecated on the open source side in favor of s3a://. You should only use s3n:// if you're running Spark on Hadoop 2.6 or older.
like image 113
Nick Chammas Avatar answered Oct 12 '22 18:10

Nick Chammas


Note: Under Spark 1.2, the proper format would be as follows:

val rdd = sc.textFile("s3n://<bucket>/<foo>/bar.*.gz")

That's s3n://, not s3://

You'll also want to put your credentials in conf/spark-env.sh as AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

like image 43
Joseph Lust Avatar answered Oct 12 '22 18:10

Joseph Lust