Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Structured Streaming execute separate streaming queries (in parallel or sequentially)?

I'm writing a test application that consumes messages from Kafka's topcis and then push data into S3 and into RDBMS tables (flow is similar to presented here: https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html). So I read data from Kafka and then:

  • each message want to save into S3
  • some messages save to table A in an external database (based on filter condition)
  • some other messages save to table B in an external database (other filter condition)

So I have sth like:

Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))

(please notice that I'm reading from more than one Kafka topic). Next I define required datasets:

Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions

and now for each Dataset I create query to start processing:

StreamingQuery s3Query = allMessages
.writeStream()
.format("parquet")
.option("startingOffsets", "latest")
.option("path", "s3_location")
.start()

StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table
.start();

StreamingQuery secondQuery = messagesOfType2
.writeStream()
.foreach(new CustomForEachWiriterType2()) // class that extends ForeachWriter[T] and save data into external RDBMS table (may be even another database than before)
.start();

Now I'm wondering:

Will be those queries executed in parallel (or one after another in FIFO order and I should assign those queries to separate scheduler pools)?

like image 955
mm112 Avatar asked May 14 '17 10:05

mm112


People also ask

How does structured streaming work?

Structured Streaming lets you express computation on streaming data in the same way you express a batch computation on static data. The Structured Streaming engine performs the computation incrementally and continuously updates the result as streaming data arrives.

Which Spark streaming function is used to combine streams that are running in parallel?

Note that, if you want to receive multiple streams of data in parallel in your streaming application, you can create multiple input DStreams (discussed further in the Performance Tuning section). This will create multiple receivers which will simultaneously receive multiple data streams.

How does Spark structured streaming work?

Spark Structured Streaming is a stream processing engine built on Spark SQL that processes data incrementally and updates the final results as more streaming data arrives. It brought a lot of ideas from other structured APIs in Spark (Dataframe and Dataset) and offered query optimizations similar to SparkSQL.

Is structured streaming exactly once?

Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.


1 Answers

Will be those queries executed in parallel

Yes. These queries are going to be executed in parallel (every trigger which you did not specify and hence is to run them as fast as possible).


Internally, when you execute start on a DataStreamWriter, you create a StreamExecution that in turn creates immediately so-called daemon microBatchThread (quoted from the Spark source code below):

  val microBatchThread =
    new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
      override def run(): Unit = {
        // To fix call site like "run at <unknown>:0", we bridge the call site from the caller
        // thread to this micro batch thread
        sparkSession.sparkContext.setCallSite(callSite)
        runBatches()
      }
    }

You can see every query in its own thread with name:

stream execution thread for [prettyIdString]

You can check the separate threads using jstack or jconsole.

like image 100
Jacek Laskowski Avatar answered Oct 25 '22 01:10

Jacek Laskowski