Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set offset committed by the consumer group using Spark's Direct Stream for Kafka?

I am trying to use Spark's Direct Approach (No Receivers) for Kafka, I have following Kafka configuration map:

configMap.put("zookeeper.connect","192.168.51.98:2181");
configMap.put("group.id", UUID.randomUUID().toString());
configMap.put("auto.offset.reset","smallest");
configMap.put("auto.commit.enable","true");
configMap.put("topics","IPDR31");
configMap.put("kafka.consumer.id","kafkasparkuser");
configMap.put("bootstrap.servers","192.168.50.124:9092");

Now my objective is, if my Spark pipeline crashes and it is started again, the stream should be started from the latest offset committed by the consumer group. So, for that purpose, I want to specify the starting offset for consumer. I have information about the offsets committed in each partition. How I can supply this information to the streaming function. Currently I am using

JavaPairInputDStream<byte[], byte[]> kafkaData =
   KafkaUtils.createDirectStream(js, byte[].class, byte[].class,
     DefaultDecoder.class, DefaultDecoder.class,configMap,topic); 
like image 818
abi_pat Avatar asked Feb 10 '23 15:02

abi_pat


2 Answers

Look at the second form of createDirectStream in the Spark API docs - it allows you to pass in a Map<TopicAndPartition, Long>, where the Long is the offset.

Note that Spark will not automatically update your offsets in Zookeeper when using a DirectInputStream - you have to write them yourself either to ZK or some other database. Unless you have a strict requirement of exactly-once semantics, it will be easier to use the createStream method to get back a DStream, in which case Spark will update the offsets in ZK and resume from the last stored offset in the case of failure.

like image 52
Ramesh Sen Avatar answered Feb 12 '23 03:02

Ramesh Sen


For your requirement, the correct solution is to use checkpoint. For every RDDStream processed, checkpoint will write the metadata to a shared storage specified (typically hdfs). Its metadata, not the real data so there is no real performance impact.

If the spark process is crashed and restarted, it will first read the checkpoint and resume from the saved offsets from checkpoint.

You can refer the sample code where i use spark streaming to write data to elasticsearch reliabily using checkpoint. https://github.com/atulsm/Test_Projects/blob/master/src/spark/StreamingKafkaRecoverableDirectEvent.java

like image 32
Atul Soman Avatar answered Feb 12 '23 05:02

Atul Soman