Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to save latest offset that Spark consumed to ZK or Kafka and can read back after restart

I am using Kafka 0.8.2 to receive data from AdExchange then I use Spark Streaming 1.4.1 to store data to MongoDB.

My problem is when I restart my Spark Streaming Job for instance like update new version, fix bug, add new features. It will continue read the latest offset of kafka at the time then I will lost data AdX push to kafka during restart the job.

I try something like auto.offset.reset -> smallest but it will receive from 0 -> last then data was huge and duplicate in db.

I also try to set specific group.id and consumer.id to Spark but it the same.

How to save the latest offset spark consumed to zookeeper or kafka then can read back from that to latest offset?

like image 664
giaosudau Avatar asked Aug 06 '15 04:08

giaosudau


People also ask

How do I save Kafka offsets?

Kafka store the offset commits in a topic, when consumer commit the offset, kafka publish an commit offset message to an "commit-log" topic and keep an in-memory structure that mapped group/topic/partition to the latest offset for fast retrieval.

How do I save offset?

Right-click the list on the Offset Points window and click Save Offset Points.

How do I set Kafka offset to latest?

How to change consumer offset? Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.

How does spark streaming record its offset?

Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures.


2 Answers

One of the constructors of createDirectStream function can get a map that will hold the partition id as the key and the offset from which you are starting to consume as the value.

Just look at api here: http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html The map that I was talking about usually called: fromOffsets

You can insert data to the map:

startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)

And use it when you create the direct stream:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
                streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))

After each iteration you can get the processed offsets using:

rdd.asInstanceOf[HasOffsetRanges].offsetRanges

You would be able to use this data to construct the fromOffsets map in the next iteration.

You can see the full code and usage here: https://spark.apache.org/docs/latest/streaming-kafka-integration.html at the end of the page

like image 150
Michael Kopaniov Avatar answered Oct 21 '22 06:10

Michael Kopaniov


To add to Michael Kopaniov's answer, if you really want to use ZK as the place you store and load your map of offsets from, you can.

However, because your results are not being output to ZK, you will not get reliable semantics unless your output operation is idempotent (which it sounds like it isn't).

If it's possible to store your results in the same document in mongo alongside the offsets in a single atomic action, that might be better for you.

For more detail, see https://www.youtube.com/watch?v=fXnNEq1v3VA

like image 31
Cody Koeninger Avatar answered Oct 21 '22 07:10

Cody Koeninger