The quote from https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1
The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a non-retriable failure, commitAsync() will not retry.
This phrase is not clear to me. I suppose that consumer sends commit request to broker and in case if the broker doesn't respond within some timeout it means that the commit failed. Am I wrong?
Can you clarify the difference of commitSync
and commitAsync
in details?
Also, please provide use cases when which commit type should I prefer.
The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync ). This distinction gives the consumer control over when a record is considered consumed.
commitAsync. public void commitAsync(OffsetCommitCallback callback) Commit offsets returned on the last poll() for the subscribed list of topics and partitions. This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup.
The Kafka consumer commits the offset periodically when polling batches, as described above. This strategy works well if the message processing is synchronous and failures handled gracefully. Be aware that starting Quarkus 1.9, auto commit is disabled by default. So you need to explicitly enable it.
The Kafka consumer poll() method fetches records in sequential order from a specified topic/partitions. This poll() method is how Kafka clients read data from Kafka. When the poll() method is called, the consumer will fetch records from the last consumed offset.
As it is said in the API documentation:
This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown to the caller).
That means, the commitSync
is a blocking method. Calling it will block your thread until it either succeeds or fails.
For example,
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitSync();
}
}
For each iteration in the for-loop, only after consumer.commitSync()
successfully returns or interrupted with exception thrown, your code will move to the next iteration.
This is an asynchronous call and will not block. Any errors encountered are either passed to the callback (if provided) or discarded.
That means, the commitAsync
is a non-blocking method. Calling it will not block your thread. Instead, it will continue processing the following instructions, no matter whether it will succeed or fail eventually.
For example, similar to previous example, but here we use commitAsync
:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitAsync(callback);
}
}
For each iteration in the for-loop, no matter what will happen to consumer.commitAsync()
eventually, your code will move to the next iteration. And, the result of the commit is going to be handled by the callback function you defined.
Trade-offs: latency vs. data consistency
commitSync()
because it will make sure that, before doing any further actions, you will know whether the offset commit is successful or failed. But because it is sync and blocking, you will spend more time on waiting for the commit to be finished, which leads to high latency.commitAsync()
because it will not wait to be finished. Instead, it will just send out the commit request and handle the response from Kafka (success or failure) later, and meanwhile, your code will continue executing. This is all generally speaking, the actually behaviour will depend on your actual code and where you are calling the method.
In the book "Kafka - The Definitive Guide", there is a hint on how to mitigate the potential problem of commiting lower offsets due to an asynchronous commit:
Retrying Async Commits: A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you’re getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don’t retry because a newer commit was already sent.
The following code depicts a possible solution:
import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._
object AsyncCommitWithCallback extends App {
// define topic
val topic = "myOutputTopic"
// set properties
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// [set more properties...]
// create KafkaConsumer and subscribe
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJavaCollection)
// initialize global counter
val atomicLong = new AtomicLong(0)
// consume message
try {
while(true) {
val records = consumer.poll(Duration.ofMillis(1)).asScala
if(records.nonEmpty) {
for (data <- records) {
// do something with the records
}
consumer.commitAsync(new KeepOrderAsyncCommit)
}
}
} catch {
case ex: KafkaException => ex.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
class KeepOrderAsyncCommit extends OffsetCommitCallback {
// keeping position of this callback instance
val position = atomicLong.incrementAndGet()
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// retrying only if no other commit incremented the global counter
if(exception != null){
if(position == atomicLong.get) {
consumer.commitAsync(this)
}
}
}
}
}
Both commitSync and commitAsync uses kafka offset management feature and both has demerits. If the message processing succeeds and commit offset failed(not atomic) and at same time partition re balancing happens, your processed message gets processed again(duplicate processing) by some other consumer. If you are okay with duplicate message processing, then you can go for commitAsync(because it doesn't block and provide low latency, and it provides a higher order commit. so you should be okay). Otherwise go for a custom offset management that takes care of atomicity while processing and updating the offset(use an external offset storage)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With