I'm working with Spark's Structured Streaming (2.2.1), using Kafka to receive data from sensors every 60 seconds. I'm having troubles wrapping my head around how to package this Kafka Data to be able to process is correctly as it comes.
I need to be able to do some calculations as the data comes in with Kafka.
My issue is unpacking the JSON data which is coming from Kafka into datasets I can work with
A simplified data looks something like this:
{
id: 1,
timestamp: "timestamp"
pump: {
current: 1.0,
flow: 20.0
torque: 5.0
},
reactors: [
{
id: 1,
status: 200,
},
{
id: 2,
status: 300,
}
],
settings: {
pumpTimer: 20.0,
reactorStatusTimer: 200.0
}
}
In order to be able to work with this is Spark, I've created some case class structures for each of these:
// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)
And generating the schema using:
val rawDataSchema = Encoders.product[RawData].schema
Firstly I put the 'value' field from Kafka into my general schema:
val rawDataSet = df.select($"value" cast "string" as "json")
.select(from_json($"json", rawDataSchema))
.select("data.*").as[RawData]
Using this rawDataSet, I can package each of the individual objects into datasets.
val pump = rawDataSet.select(from_json($"pump", pumpSchema) as 'pumpData)
.select("pumpData.*").as[Pump]
val settings = rawDataSet.select(from_json($"settings", settingsSchema) as 'settingsData)
.select("settingsData.*").as[Settings]
And this gives me nice and clean datasets per JSON object.
Here are my issues, if I want to for example compare or calculate some values between the two datasets for Settings and Pump, JOIN is not working using Structured Streaming.
val joinedData = pump.join(settings)
Error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;
Is my approach for this all wrong? Or is there any recommendations for alternative ways to handle this?
Thanks
Internally, a DStream is a sequence of RDDs. Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing.
Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2. Structured Streaming allows you to take the same operations that you perform in batch mode using Spark's structured APIs, and run them in a streaming fashion.
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.
Streaming – Complete Output Mode This mode is used only when you have streaming aggregated data. One example would be counting the words on streaming data and aggregating with previous data and output the results to sink.
I’ll answer my own question with my now working solution
Instead of making case classes for each of the objects within the JSON, I could connect these together as one case class with nested objects as such:
case class RawData(
id: String,
timestamp: String,
pump: Pump,
reactors: Array[Reactor],
settings: Settings
)
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)
To make this into a usable Dataset, I could simply call
val rawDataset = df.select($"value" cast "string" as "json")
.select(from_json($"json", Encoders.product[RawData].schema) as 'data)
.select("data.*").as[RawData]
.withColumn("reactor", explode($"reactors")) // Handles the array of reactors, making one row in the dataset per reactor.
After having processed the JSON and put it into my define schema, I could select each specific sensor like this:
val tester = rawDataset.select($"pump.current", $”settings.pumpTimer”)
Thank you user6910411 for pointing me in the right direction
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With