Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to start multiple streaming queries in a single Spark application?

I have built few Spark Structured Streaming queries to run on EMR, they are long running queries, and need to run at all times, since they are all ETL type queries, when I submit a job to YARN cluster on EMR, I can submit a single spark application. So that spark application should have multiple streaming queries.

I am confused on how to build/start multiple streaming queries within same submit programmatically.

For ex: I have this code:

case class SparkJobs(prop: Properties) extends Serializable {
  def run() = {
      Type1SparkJobBuilder(prop).build().awaitTermination()
      Type1SparkJobBuilder(prop).build().awaitTermination()
  }
}

I fire this in my main class with SparkJobs(new Properties()).run()

When I see in the spark history server, only the first spark streaming job (Type1SparkJob) is running.

What is the recommended way to fire multiple streaming queries within same spark submit programatically, I could not find proper documentation either.

like image 811
Naveen Cotha Avatar asked Oct 11 '18 14:10

Naveen Cotha


People also ask

Does Spark Streaming support batch operations?

Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

Does Spark Streaming run continuously?

Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.

What is the difference between Spark Streaming and structured Streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

What is batch interval in Spark Streaming?

A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute. source: spark.apache.org. So the data would start pouring in a stream in batches, this continuous stream of data is called DStream.


1 Answers

Since you're calling awaitTermination on the first query it's going to block until it completes before starting the second query. So you want to kick off both queries, but then use StreamingQueryManager.awaitAnyTermination.

val query1 = df.writeStream.start()
val query2 = df.writeStream.start()

spark.streams.awaitAnyTermination()

In addition to the above, by default Spark uses the FIFO scheduler. Which means the first query gets all resources in the cluster while it's executing. Since you're trying to run multiple queries concurrently you should switch to the FAIR scheduler

If you have some queries that should have more resources than the others then you can also tune the individual scheduler pools.

like image 131
Silvio Avatar answered Sep 23 '22 13:09

Silvio