Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Structured Streaming and Splitting nested data into multiple datasets

I'm working with Spark's Structured Streaming (2.2.1), using Kafka to receive data from sensors every 60 seconds. I'm having troubles wrapping my head around how to package this Kafka Data to be able to process is correctly as it comes.

I need to be able to do some calculations as the data comes in with Kafka.

My issue is unpacking the JSON data which is coming from Kafka into datasets I can work with

Data

A simplified data looks something like this:

{
  id: 1,
  timestamp: "timestamp"
  pump: {
    current: 1.0,
    flow: 20.0
    torque: 5.0
  },
  reactors: [
    {
      id: 1,
      status: 200,
    },

    {
      id: 2,
      status: 300,
    }
  ],
  settings: {
    pumpTimer: 20.0,
    reactorStatusTimer: 200.0
  }
}

In order to be able to work with this is Spark, I've created some case class structures for each of these:

// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)

And generating the schema using:

val rawDataSchema = Encoders.product[RawData].schema

Raw data to Spark Schema

Firstly I put the 'value' field from Kafka into my general schema:

val rawDataSet = df.select($"value" cast "string" as "json")
  .select(from_json($"json", rawDataSchema))
  .select("data.*").as[RawData]

Using this rawDataSet, I can package each of the individual objects into datasets.

val pump = rawDataSet.select(from_json($"pump", pumpSchema) as 'pumpData)
  .select("pumpData.*").as[Pump]

val settings = rawDataSet.select(from_json($"settings", settingsSchema) as 'settingsData)
  .select("settingsData.*").as[Settings]

And this gives me nice and clean datasets per JSON object.

Working with the data

Here are my issues, if I want to for example compare or calculate some values between the two datasets for Settings and Pump, JOIN is not working using Structured Streaming.

val joinedData = pump.join(settings)

Error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;

Is my approach for this all wrong? Or is there any recommendations for alternative ways to handle this?

Thanks

like image 890
Martin Avatar asked Apr 01 '18 10:04

Martin


People also ask

What is difference between DStream and structured streaming?

Internally, a DStream is a sequence of RDDs. Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing.

What is structured streaming?

Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2. Structured Streaming allows you to take the same operations that you perform in batch mode using Spark's structured APIs, and run them in a streaming fashion.

What is the use of saveAsObjectFiles () operation on Dstreams?

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.

Which of the following methods is used to count the streaming words and aggregate the previous data?

Streaming – Complete Output Mode This mode is used only when you have streaming aggregated data. One example would be counting the words on streaming data and aggregating with previous data and output the results to sink.


1 Answers

I’ll answer my own question with my now working solution

Instead of making case classes for each of the objects within the JSON, I could connect these together as one case class with nested objects as such:

case class RawData(
  id: String, 
  timestamp: String, 
  pump: Pump, 
  reactors: Array[Reactor], 
  settings: Settings
)

case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)

To make this into a usable Dataset, I could simply call

val rawDataset = df.select($"value" cast "string" as "json")
  .select(from_json($"json", Encoders.product[RawData].schema) as 'data)
  .select("data.*").as[RawData]
  .withColumn("reactor", explode($"reactors")) // Handles the array of reactors, making one row in the dataset per reactor.

After having processed the JSON and put it into my define schema, I could select each specific sensor like this:

val tester = rawDataset.select($"pump.current", $”settings.pumpTimer”)

Thank you user6910411 for pointing me in the right direction

like image 121
Martin Avatar answered Jan 03 '23 01:01

Martin