I am trying to verify the working of manual offset commit.
When I try to exit the job either by using thread.sleep()/jssc.stop()/ throwing exceptions in the while loop, I see offsets are being committed.
I am just sending couple of message in order to test, but I see 0 lag as soon as the job starts processing the batch.
When does spark actually commit the offsets ?
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
kafkaStream.foreachRDD(kafkaStreamRDD -> {
// fetch kafka offsets for manually committing it later
OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();
// filter unwanted data
kafkaStreamRDD.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
//filter logic here
}).foreachPartition(kafkaRecords -> {
//Initializing DB connections
while (kafkaRecords.hasNext()) {
//doing some work here
//-----> EXCEPTION
throw new Exception();
}
});
// commit offsets saveOffsets after processing
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
if (exception != null) {
System.out.println("-------------Unable to commit offsets, something went wrong, trace ------------"+ exception.getCause());
exception.printStackTrace(); // need this for driver
} else {
System.out.println("Successfully committed offsets"); // need this for driver
for (OffsetRange offsetRange : offsetRanges) {
System.out.println("Offset Info: paratition {}, fromOffset {} untilOffset {}: "+ offsetRange.partition() +":"+ offsetRange.fromOffset() +":"+ offsetRange.untilOffset());
}
}
});
enable.auto.commit : false
Observe the throw new Exception(); in while loop. Even if the batch fails because of the exception, i see the offset committed, I am expecting some lag here as the processing failed, what is wrong here ?
The beauty of the Spark structured stream on Kafka is that it provides manual offset which is not available in Kafka Stream. Spark stream commit is thread-safe which is async in nature and since Kafka is not transactional, so your outputs must still be idempotent. So means when you start consuming message your offset keeps incrementing whereas commit might be appeared latter. As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe but must occur after outputs.
You can check how you commit perform by using call back as below
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, new OffsetCommitCallback() {
def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) {
m.foreach(f => {
if (null != e) {
logger.info("Failed to cmomit:" + f._1 + "," + f._2)
logger.info("Error while commitAsync. Retry again"+e.toString)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} else {
println("Offset commit:" + f._1 + "," + f._2)
}
})
}
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With