Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading multiple files from S3 in Spark by date period

Description

I have an application, which sends data to AWS Kinesis Firehose and this writes the data into my S3 bucket. Firehose uses "yyyy/MM/dd/HH" format to write the files.

Like in this sample S3 path:

s3://mybucket/2016/07/29/12 

Now I have a Spark application written in Scala, where I need to read data from a specific time period. I have start and end dates. The data is in JSON format and that's why I use sqlContext.read.json() not sc.textFile().

How can I read the data quickly and efficiently?

What have I tried?

  1. Wildcards - I can select the data from all hours of a specific date or all dates of a specific month, for example:

    val df = sqlContext.read.json("s3://mybucket/2016/07/29/*") val df = sqlContext.read.json("s3://mybucket/2016/07/*/*") 

    But if I have to read data from the date period of a few days, for example 2016-07-29 - 2016-07-30 I cannot use the wildcard approach in the same way.

    Which brings me to my next point...

  2. Using multiple paths or a CSV of directories as presented by samthebest in this solution. It seems that separating directories with commas only works with sc.textFile() and not sqlContext.read.json().
  3. Union - A second solution from the previous link by cloud suggests to read each directory separately and then union them together. Although he suggests unioning RDD-s, there's an option to union DataFrames as well. If I generate the date strings from given date period manually, then I may create a path that does not exist and instead of ignoring it, the whole reading fails. Instead I could use AWS SDK and use the function listObjects from AmazonS3Client to get all the keys like in iMKanchwala's solution from the previous link.

    The only problem is that my data is constantly changing. If read.json() function gets all the data as a single parameter, it reads all the necessary data and is smart enough to infer the json schema from the data. If I read 2 directories separately and their schemas don't match, then I think unioning these two dataframes becomes a problem.

  4. Glob(?) syntax - This solution by nhahtdh is a little better than options 1 and 2 because they provide the option to specify dates and directories in more detail and as a single "path" so it works also with read.json().

    But again, a familiar problem occurs about the missing directories. Let's say I want all the data from 20.07 to 30.07, I can declare it like this:

    val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*") 

    But if I am missing data from let's say 25th of July, then the path ..16/07/25/ does not exist and the whole function fails.

And obviously it gets more difficult when the requested period is for example 25.11.2015-12.02.2016, then I would need to programmatically (in my Scala script) create a string path something like this:

"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*" 

And by creating it, I would neet to somehow be sure that these 25-30 and 01-12 intervals all have corresponding paths, if one is missing, it fails again. (Asterisk fortunately deals with missing directories, as it reads everything that exists)

How can I read all the necessary data from a single directory path all at once without the possibility of failing because of a missing directory between some date interval?

like image 279
V. Samma Avatar asked Jul 29 '16 11:07

V. Samma


People also ask

How do I list files in S3 bucket with Spark session?

create(path), new Configuration()) val it = fileSystem. listFiles(new Path(path), true) while (it. hasNext()) { ... } this is the way to do it, and the listFiles(path, true) can give you much better performance with s3 than trying to do a treewalk yourself.

How do I read different file formats in Spark?

Also, like any other file system, we can read and write TEXT, CSV, Avro, Parquet and JSON files into HDFS. Spark RDD natively supports reading text files and later with DataFrame, Spark added different data sources like CSV, JSON, Avro, and Parquet.

Can Spark streaming read from S3?

After the Spark Streaming application processes the data, it stores the data in an Amazon S3 bucket. The Real-Time Analytics solution requires a working Spark Streaming application written in Java, Scala, or Python. We recommend that you use the latest version of Apache Spark for your application.

How many partitions does Spark create when a file is loaded from S3 bucket?

Even when reading a file from an S3 bucket, Spark (by default) creates one partition per block i.e. total no of partitions = total-file-size / block-size.


1 Answers

There is a much simpler solution. If you look at the DataFrameReader API you'll notice that there is a .json(paths: String*) method. Just build a collection of the paths you want, with globs of not, as you prefer, and then call the method, e.g.,

val paths: Seq[String] = ... val df = sqlContext.read.json(paths: _*) 
like image 167
Sim Avatar answered Oct 06 '22 12:10

Sim