Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get Kafka offsets for structured query for manual and reliable offset management?

Spark 2.2 introduced a Kafka's structured streaming source. As I understand, it's relying on HDFS checkpoint directory to store offsets and guarantee an "exactly-once" message delivery.

But old docks (like https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) says that Spark Streaming checkpoints are not recoverable across applications or Spark upgrades and hence not very reliable. As a solution, there is a practice to support storing offsets in external storage that supports transactions like MySQL or RedshiftDB.

If I want to store offsets from Kafka source to a transactional DB, how can I obtain offset from a structured stream batch?

Previously, it can be done by casting RDD to HasOffsetRanges:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges     

But with new Streaming API, I have an Dataset of InternalRow and I can't find an easy way to fetch offsets. The Sink API has only addBatch(batchId: Long, data: DataFrame) method and how can I suppose to get an offset for given batch id?

like image 708
dnaumenko Avatar asked Sep 11 '17 10:09

dnaumenko


People also ask

How offsets are maintained in Kafka?

Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.

How do I save Kafka offsets?

Offsets in Kafka are stored as messages in a separate topic named '__consumer_offsets' . Each consumer commits a message into the topic at periodic intervals.

How many offset in Kafka?

Kafka maintains two types of offsets, the current and committed offset.

What is Kafka topic offset?

Kafka maintains the consumer offsets per consumer group, topic and partition, which means, if you have multiple consumers with different consumer groups on the same topic, it will maintain the offset for each group individually.


1 Answers

Spark 2.2 introduced a Kafka's structured streaming source. As I understand, it's relying on HDFS checkpoint dir to store offsets and guarantee an "exactly-once" message delivery.

Correct.

Every trigger Spark Structured Streaming will save offsets to offset directory in the checkpoint location (defined using checkpointLocation option or spark.sql.streaming.checkpointLocation Spark property or randomly assigned) that is supposed to guarantee that offsets are processed at most once. The feature is called Write Ahead Logs.

The other directory in the checkpoint location is commits directory for completed streaming batches with a single file per batch (with a file name being the batch id).

Quoting the official documentation in Fault Tolerance Semantics:

To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.

Every time a trigger is executed StreamExecution checks the directories and "computes" what offsets have been processed already. That gives you at least once semantics and exactly once in total.

But old docs (...) says that Spark Streaming checkpoints are not recoverable across applications or Spark upgrades and hence not very reliable.

There was a reason why you called them "old", wasn't there?

They refer to the old and (in my opinion) dead Spark Streaming that kept not only offsets but the entire query code that led to situations where the checkpointing were almost unusable, e.g. when you change the code.

The times are over now and Structured Streaming is more cautious what and when is checkpointed.

If I want to store offsets from Kafka source to a transactional DB, how can I obtain offset from a structured stream batch?

A solution could be to implement or somehow use MetadataLog interface that is used to deal with offset checkpointing. That could work.

how can I suppose to get an offset for given batch id?

It is not currently possible.

My understanding is that you will not be able to do it as the semantics of streaming are hidden from you. You simply should not be dealing with this low-level "thing" called offsets that Spark Structured Streaming uses to offer exactly once guarantees.

Quoting Michael Armbrust from his talk at Spark Summit Easy, Scalable, Fault Tolerant Stream Processing with Structured Streaming in Apache Spark:

you should not have to reason about streaming

and further in the talk (on the next slide):

you should write simple queries & Spark should continuously update the answer


There is a way to get offsets (from any source, Kafka including) using StreamingQueryProgress that you can intercept using StreamingQueryListener and onQueryProgress callback.

onQueryProgress(event: QueryProgressEvent): Unit Called when there is some status update (ingestion rate updated, etc.)

With StreamingQueryProgress you can access sources property with SourceProgress that gives you what you want.

like image 156
Jacek Laskowski Avatar answered Sep 29 '22 04:09

Jacek Laskowski