Kinesis firehose manages the persistence of files, in this case time series JSON, into a folder hierarchy that is partitioned by YYYY/MM/DD/HH (down to the hour in 24 numbering)...great.
How using Spark 2.0 then can I read these nested sub folders and create a static Dataframe from all the leaf json files? Is there an 'option' to the dataframe reader?
My next goal is for this to be a streaming DF, where new files persisted by Firehose into s3 naturally become part of the streaming dataframe using the new structured streaming in Spark 2.0. I know this is all experimental - hoping someone has used S3 as a streaming file source before, where the data is paritioned into folders as described above. Of course would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.
ND: I am using databricks, which mounts S3 into DBFS, but could easily be EMR of course or other Spark providers. Be great to see a notebook too if one is shareable that gives an example.
Cheers!
Spark SQL can automatically infer the schema of a JSON dataset and load it as a Dataset<Row> . This conversion can be done using SparkSession. read(). json() on either a Dataset<String> , or a JSON file.
This conversion can be done using SQLContext. read. json() on either an RDD of String or a JSON file. Spark SQL provides an option for querying JSON data along with auto-capturing of JSON schemas for both reading and writing data.
json is read using the spark. read. json("path") function. The "multiline_dataframe" value is created for reading records from JSON files that are scattered in multiple lines so, to read such files, use-value true to multiline option and by default multiline option is set to false.
Can I read nested subfolders and create a static DataFrame from all the leaf JSON files? Is there an option to the DataFrame reader?
Yes, as your directory structure is regular(YYYY/MM/DD/HH
), you can give the path till leaf node with wildcard chars like below
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json")
// Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json
Of course, would prefer straight of a Kinesis stream but there is no date on this connector for 2.0 so Firehose->S3 is the interim.
I could see there is a library for Kinesis integration with Spark Streaming. So, you can read the streaming data directly and perform SQL operations on it without reading from S3.
groupId = org.apache.spark
artifactId = spark-streaming-kinesis-asl_2.11
version = 2.0.0
Sample code with Spark Streaming and SQL
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
val kinesisStream = KinesisUtils.createStream(
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
kinesisStream.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here")
// Create a temporary view with DataFrame
jsonDf.createOrReplaceTempView("json_data_tbl")
//As we have DataFrame and SparkSession object we can perform most
//of the Spark SQL stuff here
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With