In the Java Client (http://kafka.apache.org/documentation.html#highlevelconsumerapi), does commitOffsets on the high-level consumer block until offsets are successfully commited, or is it fire-and-forget?
When implementing a multi-threaded consumer architecture, it is important to note that the Kafka consumer is not thread safe. Multi-threaded access must be properly synchronized, which can be tricky. This is why the single-threaded model is commonly used.
A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.
If a consumer fails before a commit, all messages after the last commit are received from Kafka and processed again. However, this retry might result in duplicates, as some message from the last poll() call might have been processed but the failure happened right before the auto commit call.
When the number of consumers is lower than partitions, same consumers are going to read messages from more than one partition. In your scenario, a single consumer is going to read from all your partitions. This type of consumer is known as exclusive consumer. This happens when consumer groups have only one consumer.
Does commitOffsets on the high-level consumer block until offsets are successfully committed?
It looks like commitOffsets()
loops through each consumer and calls updatePersistentPath
if its offset has changed, and if so writes data via zkClient.writeData(path, getBytes(data))
. It appears is though commitOffsets()
does block until all the offsets are committed.
Here is the source code for commitOffsets()
(ref):
public void commitOffsets() {
if (zkClient == null) {
logger.error("zk client is null. Cannot commit offsets");
return;
}
for (Entry<String, Pool<Partition, PartitionTopicInfo>> e : topicRegistry.entrySet()) {
ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(config.getGroupId(), e.getKey());
for (PartitionTopicInfo info : e.getValue().values()) {
final long lastChanged = info.getConsumedOffsetChanged().get();
if (lastChanged == 0) {
logger.trace("consume offset not changed");
continue;
}
final long newOffset = info.getConsumedOffset();
//path: /consumers/<group>/offsets/<topic>/<brokerid-partition>
final String path = topicDirs.consumerOffsetDir + "/" + info.partition.getName();
try {
ZkUtils.updatePersistentPath(zkClient, path, "" + newOffset);
} catch (Throwable t) {
logger.warn("exception during commitOffsets, path=" + path + ",offset=" + newOffset, t);
} finally {
info.resetComsumedOffsetChanged(lastChanged);
if (logger.isDebugEnabled()) {
logger.debug("Committed [" + path + "] for topic " + info);
}
}
}
}
}
and for updatePersistentPath(...)
(ref):
public static void updatePersistentPath(ZkClient zkClient, String path, String data) {
try {
zkClient.writeData(path, getBytes(data));
} catch (ZkNoNodeException e) {
createParentPath(zkClient, path);
try {
zkClient.createPersistent(path, getBytes(data));
} catch (ZkNodeExistsException e2) {
zkClient.writeData(path, getBytes(data));
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With