Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark structured streaming consistency across sinks

I'd like to understand better the consistency model of Spark 2.2 structured streaming in the following case :

  • one source (Kinesis)
  • 2 queries from this source towards 2 different sinks : one file sink for archive purpose (S3), and another sink for processed data (DB or file, not yet decided)

I'd like to understand if there's any consistency guarantee across sinks, at least under certain circumstances :

  • Can one of the sink be way ahead of the other ? Or are they consuming data at the same speed on the source (since its the same source) ? Can they be synchronous ?
  • If I (gracefully) stop the stream application, will the data on the 2 sinks consistent ?

The reason is I'd like to build a Kappa-like processing app, with the ability to suspend/shutdown the streaming part when I want to reprocess some history, and, when I resume the streaming, avoid reprocessing something that has already been processed (as being in the history), or missing some (eg. some data that has not been committed to the archive, and then skipped as already processed when the streaming resume)

like image 945
mathieu Avatar asked Nov 07 '17 13:11

mathieu


People also ask

Which property must a Spark structured Streaming sink possess to ensure end to end exactly once semantics?

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.

What is the data sink for Spark Streaming?

Sink is part of Data Source API V1 and used in Micro-Batch Stream Processing only. Used exclusively when MicroBatchExecution stream execution engine (Micro-Batch Stream Processing) is requested to add a streaming batch to a sink (addBatch phase) while running an activated streaming query.

What is the difference between Spark Streaming and structured Streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

Is Spark Streaming deprecated?

Now that the Direct API of Spark Streaming (we currently have version 2.3. 2) is deprecated and we recently added the Confluent platform (comes with Kafka 2.2. 0) to our project we plan to migrate these applications.


2 Answers

One important thing to keep in mind is the 2 sinks will be used from 2 distinct queries, each reading independently from the source. So checkpointing is done per-query.

Whenever you call start on a DataStreamWriter that results in a new query and if you set checkpointLocation each query will have its own checkpointing to track offsets from the sink.

val input = spark.readStream....

val query1 = input.select('colA, 'colB)
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir1")
  .start("/path1")

val query2 = input.select('colA, 'colB)
  .writeStream
  .format("csv")
  .option("checkpointLocation", "path/to/checkpoint/dir2")
  .start("/path2")

So each query is reading from the source and tracking offsets independently. Which then also means, each query can be at different offsets of the input stream and you can restart either or both without impacting the other.

UPDATE

I wanted to make another suggestion now that Databricks Delta is open sourced. A common pattern I've used is landing data from upstream sources directly into an append-only Delta table. Then, with Structured Streaming, you can efficiently subscribe to the table and process the new records incrementally. Delta's internal transaction log is more efficient than S3 file listings required with the basic file source. This ensures you have a consistent source of data across multiple queries, pulling from S3 vs Kinesis.

like image 117
Silvio Avatar answered Nov 20 '22 07:11

Silvio


What Silvio has written is absolutely correct. Writing to 2 sinks will start two streaming queries running independently of each other ( effectively 2 streaming applications reading same data 2 times and processing 2 times and checkpointing on their own).

I want to just add that if you want both the queries to stop/pause at the same time in case of restart or failure of any one of the query, there is option of using an api : awaitAnyTermination()

Instead of using :

query.start().awaitTermination()

use :

sparkSession.streams.awaitAnyTermination()

adding excerpts from the api documentation :

/**
   * Wait until any of the queries on the associated SQLContext has terminated since the
   * creation of the context, or since `resetTerminated()` was called. If any query was terminated
   * with an exception, then the exception will be thrown.
   *
   * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either
   * return immediately (if the query was terminated by `query.stop()`),
   * or throw the exception immediately (if the query was terminated with exception). Use
   * `resetTerminated()` to clear past terminations and wait for new terminations.
   *
   * In the case where multiple queries have terminated since `resetTermination()` was called,
   * if any query has terminated with exception, then `awaitAnyTermination()` will
   * throw any of the exception. For correctly documenting exceptions across multiple queries,
   * users need to stop all of them after any of them terminates with exception, and then check the
   * `query.exception()` for each query.
   *
   * @throws StreamingQueryException if any query has terminated with an exception
   *
   * @since 2.0.0
   */
  @throws[StreamingQueryException]
  def awaitAnyTermination(): Unit = {
    awaitTerminationLock.synchronized {
      while (lastTerminatedQuery == null) {
        awaitTerminationLock.wait(10)
      }
      if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) {
        throw lastTerminatedQuery.exception.get
      }
    }
  }
like image 31
chandan prakash Avatar answered Nov 20 '22 07:11

chandan prakash