Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - Reading JSON from Partitioned Folders using Firehose

Kinesis firehose manages the persistence of files, in this case time series JSON, into a folder hierarchy that is partitioned by YYYY/MM/DD/HH (down to the hour in 24 numbering)...great.

How using Spark 2.0 then can I read these nested sub folders and create a static Dataframe from all the leaf json files? Is there an 'option' to the dataframe reader?

My next goal is for this to be a streaming DF, where new files persisted by Firehose into s3 naturally become part of the streaming dataframe using the new structured streaming in Spark 2.0. I know this is all experimental - hoping someone has used S3 as a streaming file source before, where the data is paritioned into folders as described above. Of course would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.

ND: I am using databricks, which mounts S3 into DBFS, but could easily be EMR of course or other Spark providers. Be great to see a notebook too if one is shareable that gives an example.

Cheers!

like image 325
Kurt Maile Avatar asked Oct 30 '16 20:10

Kurt Maile


People also ask

How do I read a JSON file in Spark?

Spark SQL can automatically infer the schema of a JSON dataset and load it as a Dataset<Row> . This conversion can be done using SparkSession. read(). json() on either a Dataset<String> , or a JSON file.

Which is correct code to read employee JSON JSON file in Spark?

This conversion can be done using SQLContext. read. json() on either an RDD of String or a JSON file. Spark SQL provides an option for querying JSON data along with auto-capturing of JSON schemas for both reading and writing data.

How do I read JSON files in Pyspark?

json is read using the spark. read. json("path") function. The "multiline_dataframe" value is created for reading records from JSON files that are scattered in multiple lines so, to read such files, use-value true to multiline option and by default multiline option is set to false.


1 Answers

Can I read nested subfolders and create a static DataFrame from all the leaf JSON files? Is there an option to the DataFrame reader?

Yes, as your directory structure is regular(YYYY/MM/DD/HH), you can give the path till leaf node with wildcard chars like below

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate

val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json")
// Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json 

Of course, would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.

I could see there is a library for Kinesis integration with Spark Streaming. So, you can read the streaming data directly and perform SQL operations on it without reading from S3.

groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_2.11
version = 2.0.0

Sample code with Spark Streaming and SQL

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

val kinesisStream = KinesisUtils.createStream(
 streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
 [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)

kinesisStream.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here")

  // Create a temporary view with DataFrame
  jsonDf.createOrReplaceTempView("json_data_tbl")

  //As we have DataFrame and SparkSession object we can perform most 
  //of the Spark SQL stuff here
}
like image 69
mrsrinivas Avatar answered Oct 06 '22 18:10

mrsrinivas