I am trying to use Spark's Direct Approach (No Receivers) for Kafka, I have following Kafka configuration map:
configMap.put("zookeeper.connect","192.168.51.98:2181");
configMap.put("group.id", UUID.randomUUID().toString());
configMap.put("auto.offset.reset","smallest");
configMap.put("auto.commit.enable","true");
configMap.put("topics","IPDR31");
configMap.put("kafka.consumer.id","kafkasparkuser");
configMap.put("bootstrap.servers","192.168.50.124:9092");
Now my objective is, if my Spark pipeline crashes and it is started again, the stream should be started from the latest offset committed by the consumer group. So, for that purpose, I want to specify the starting offset for consumer. I have information about the offsets committed in each partition. How I can supply this information to the streaming function. Currently I am using
JavaPairInputDStream<byte[], byte[]> kafkaData =
KafkaUtils.createDirectStream(js, byte[].class, byte[].class,
DefaultDecoder.class, DefaultDecoder.class,configMap,topic);
Look at the second form of createDirectStream in the Spark API docs - it allows you to pass in a Map<TopicAndPartition, Long>
, where the Long is the offset.
Note that Spark will not automatically update your offsets in Zookeeper when using a DirectInputStream - you have to write them yourself either to ZK or some other database. Unless you have a strict requirement of exactly-once semantics, it will be easier to use the createStream method to get back a DStream, in which case Spark will update the offsets in ZK and resume from the last stored offset in the case of failure.
For your requirement, the correct solution is to use checkpoint. For every RDDStream processed, checkpoint will write the metadata to a shared storage specified (typically hdfs). Its metadata, not the real data so there is no real performance impact.
If the spark process is crashed and restarted, it will first read the checkpoint and resume from the saved offsets from checkpoint.
You can refer the sample code where i use spark streaming to write data to elasticsearch reliabily using checkpoint. https://github.com/atulsm/Test_Projects/blob/master/src/spark/StreamingKafkaRecoverableDirectEvent.java
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With