Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to manually commit offset in Spark Kafka direct streaming?

I looked around hard but didn't find a satisfactory answer to this. Maybe I'm missing something. Please help.

We have a Spark streaming application consuming a Kafka topic, which needs to ensure end-to-end processing before advancing Kafka offsets, e.g. updating a database. This is much like building transaction support within the streaming system, and guaranteeing that each message is processed (transformed) and, more importantly, output.

I have read about Kafka DirectStreams. It says that for robust failure-recovery in DirectStreaming mode, Spark checkpointing should be enabled, which stores the offsets along with the checkpoints. But the offset management is done internally (setting Kafka config params like ["auto.offset.reset", "auto.commit.enable", "auto.offset.interval.ms"]). It does not speak of how (or if) we can customize committing offsets (once we've loaded a database, for e.g.). In other words, can we set "auto.commit.enable" to false and manage the offsets (not unlike a DB connection) ourselves?

Any guidance/help is greatly appreciated.

like image 453
TroubleShooter Avatar asked Jul 28 '16 11:07

TroubleShooter


People also ask

How do you manually commit offset in Kafka?

Method SummaryManually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.

How does spark streaming record its offset?

Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures.

What does it mean to commit an offset in Kafka?

It commits the offset, indicating that all the previous records from that partition have been processed. So, if a consumer stops and comes back later, it restarts from the last committed position (if assigned to that partition again). Note that this behavior is configurable.

Does Kafka producer commit offset?

So, Kafka will commit your current offset every five seconds. The auto-commit is a convenient option, but it may cause second processing of records. Let us understand it with an example. You have some messages in the partition, and you made your first poll request.


1 Answers

The article below could be a good start to understand the approach.

spark-kafka-achieving-zero-data-loss

Further more,

The article suggests using zookeeper client directly, which can be replaced by something like KafkaSimpleConsumer also. The advantage of using Zookeper/KafkaSimpleConsumer is the monitoring tools that depend on Zookeper saved offset. Also the information can also be saved on HDFS or any other reliable service.

like image 160
rakesh Avatar answered Nov 09 '22 03:11

rakesh