I'm writing a test application that consumes messages from Kafka's topcis and then push data into S3 and into RDBMS tables (flow is similar to presented here: https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html). So I read data from Kafka and then:
So I have sth like:
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))
(please notice that I'm reading from more than one Kafka topic). Next I define required datasets:
Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions
and now for each Dataset I create query to start processing:
StreamingQuery s3Query = allMessages
.writeStream()
.format("parquet")
.option("startingOffsets", "latest")
.option("path", "s3_location")
.start()
StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table
.start();
StreamingQuery secondQuery = messagesOfType2
.writeStream()
.foreach(new CustomForEachWiriterType2()) // class that extends ForeachWriter[T] and save data into external RDBMS table (may be even another database than before)
.start();
Now I'm wondering:
Will be those queries executed in parallel (or one after another in FIFO order and I should assign those queries to separate scheduler pools)?
Structured Streaming lets you express computation on streaming data in the same way you express a batch computation on static data. The Structured Streaming engine performs the computation incrementally and continuously updates the result as streaming data arrives.
Note that, if you want to receive multiple streams of data in parallel in your streaming application, you can create multiple input DStreams (discussed further in the Performance Tuning section). This will create multiple receivers which will simultaneously receive multiple data streams.
Spark Structured Streaming is a stream processing engine built on Spark SQL that processes data incrementally and updates the final results as more streaming data arrives. It brought a lot of ideas from other structured APIs in Spark (Dataframe and Dataset) and offered query optimizations similar to SparkSQL.
Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Will be those queries executed in parallel
Yes. These queries are going to be executed in parallel (every trigger
which you did not specify and hence is to run them as fast as possible).
Internally, when you execute start
on a DataStreamWriter, you create a StreamExecution
that in turn creates immediately so-called daemon microBatchThread
(quoted from the Spark source code below):
val microBatchThread =
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
override def run(): Unit = {
// To fix call site like "run at <unknown>:0", we bridge the call site from the caller
// thread to this micro batch thread
sparkSession.sparkContext.setCallSite(callSite)
runBatches()
}
}
You can see every query in its own thread with name:
stream execution thread for [prettyIdString]
You can check the separate threads using jstack or jconsole.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With