I'd like to understand better the consistency model of Spark 2.2 structured streaming in the following case :
I'd like to understand if there's any consistency guarantee across sinks, at least under certain circumstances :
The reason is I'd like to build a Kappa-like processing app, with the ability to suspend/shutdown the streaming part when I want to reprocess some history, and, when I resume the streaming, avoid reprocessing something that has already been processed (as being in the history), or missing some (eg. some data that has not been committed to the archive, and then skipped as already processed when the streaming resume)
exactly once semantics are only possible if the source is re-playable and the sink is idempotent.
Sink is part of Data Source API V1 and used in Micro-Batch Stream Processing only. Used exclusively when MicroBatchExecution stream execution engine (Micro-Batch Stream Processing) is requested to add a streaming batch to a sink (addBatch phase) while running an activated streaming query.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Now that the Direct API of Spark Streaming (we currently have version 2.3. 2) is deprecated and we recently added the Confluent platform (comes with Kafka 2.2. 0) to our project we plan to migrate these applications.
One important thing to keep in mind is the 2 sinks will be used from 2 distinct queries, each reading independently from the source. So checkpointing is done per-query.
Whenever you call start
on a DataStreamWriter
that results in a new query and if you set checkpointLocation
each query will have its own checkpointing to track offsets from the sink.
val input = spark.readStream....
val query1 = input.select('colA, 'colB)
.writeStream
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir1")
.start("/path1")
val query2 = input.select('colA, 'colB)
.writeStream
.format("csv")
.option("checkpointLocation", "path/to/checkpoint/dir2")
.start("/path2")
So each query is reading from the source and tracking offsets independently. Which then also means, each query can be at different offsets of the input stream and you can restart either or both without impacting the other.
UPDATE
I wanted to make another suggestion now that Databricks Delta is open sourced. A common pattern I've used is landing data from upstream sources directly into an append-only Delta table. Then, with Structured Streaming, you can efficiently subscribe to the table and process the new records incrementally. Delta's internal transaction log is more efficient than S3 file listings required with the basic file source. This ensures you have a consistent source of data across multiple queries, pulling from S3 vs Kinesis.
What Silvio has written is absolutely correct. Writing to 2 sinks will start two streaming queries running independently of each other ( effectively 2 streaming applications reading same data 2 times and processing 2 times and checkpointing on their own).
I want to just add that if you want both the queries to stop/pause at the same time in case of restart or failure of any one of the query, there is option of using an api : awaitAnyTermination()
Instead of using :
query.start().awaitTermination()
use :
sparkSession.streams.awaitAnyTermination()
adding excerpts from the api documentation :
/**
* Wait until any of the queries on the associated SQLContext has terminated since the
* creation of the context, or since `resetTerminated()` was called. If any query was terminated
* with an exception, then the exception will be thrown.
*
* If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either
* return immediately (if the query was terminated by `query.stop()`),
* or throw the exception immediately (if the query was terminated with exception). Use
* `resetTerminated()` to clear past terminations and wait for new terminations.
*
* In the case where multiple queries have terminated since `resetTermination()` was called,
* if any query has terminated with exception, then `awaitAnyTermination()` will
* throw any of the exception. For correctly documenting exceptions across multiple queries,
* users need to stop all of them after any of them terminates with exception, and then check the
* `query.exception()` for each query.
*
* @throws StreamingQueryException if any query has terminated with an exception
*
* @since 2.0.0
*/
@throws[StreamingQueryException]
def awaitAnyTermination(): Unit = {
awaitTerminationLock.synchronized {
while (lastTerminatedQuery == null) {
awaitTerminationLock.wait(10)
}
if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) {
throw lastTerminatedQuery.exception.get
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With