Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Structured Streaming Multiple WriteStreams to Same Sink

Two Writestream to the same database sink is not happening in sequence in Spark Structured Streaming 2.2.1. Please suggest how to make them execute in sequence.

val deleteSink = ds1.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

val UpsertSink = ds2.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

Using the above code, deleteSink is executed after UpsertSink.

like image 264
Shiva Kumar M V Avatar asked Jun 11 '18 06:06

Shiva Kumar M V


People also ask

Which property must a Spark structured streaming sink possess to ensure end to end exactly once semantics?

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.

Is there any difference between Spark streaming and Spark structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

Does Spark streaming support batch operations?

Apache Spark Streaming is a scalable fault-tolerant streaming processing system that natively supports both batch and streaming workloads.

Can Spark structured streaming API can be used to process graph data?

Spark Structured Streaming API can be used to process graph data. Spark Structured Streaming treats live data as a table that is continuously appended. As Spark Structured Streaming has an underlying table, its functions/operations are similar to batch processing.


1 Answers

If you want to have two streams running in parallel, you have to use

sparkSession.streams.awaitAnyTermination()

instead of

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

In your case UpsertSink will never start unless deleteSink will be stopped or an exception thrown, as it says in the scaladoc

Waits for the termination of this query, either by query.stop() or by an exception. If the query has terminated with an exception, then the exception will be thrown. If the query has terminated, then all subsequent calls to this method will either return immediately (if the query was terminated by stop()), or throw the exception immediately (if the query has terminated with exception).

like image 173
Shikkou Avatar answered Sep 22 '22 10:09

Shikkou