Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does commitOffsets on high-level consumer block?

In the Java Client (http://kafka.apache.org/documentation.html#highlevelconsumerapi), does commitOffsets on the high-level consumer block until offsets are successfully commited, or is it fire-and-forget?

like image 979
Mattias Petter Johansson Avatar asked May 03 '15 11:05

Mattias Petter Johansson


People also ask

Is Kafka consumer multithreaded?

When implementing a multi-threaded consumer architecture, it is important to note that the Kafka consumer is not thread safe. Multi-threaded access must be properly synchronized, which can be tricky. This is why the single-threaded model is commonly used.

Can two consumers read from same partition in Kafka?

A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.

What happens if Kafka consumer does not commit?

If a consumer fails before a commit, all messages after the last commit are received from Kafka and processed again. However, this retry might result in duplicates, as some message from the last poll() call might have been processed but the failure happened right before the auto commit call.

Can one consumer read from multiple partitions?

When the number of consumers is lower than partitions, same consumers are going to read messages from more than one partition. In your scenario, a single consumer is going to read from all your partitions. This type of consumer is known as exclusive consumer. This happens when consumer groups have only one consumer.


1 Answers

Does commitOffsets on the high-level consumer block until offsets are successfully committed?

It looks like commitOffsets() loops through each consumer and calls updatePersistentPath if its offset has changed, and if so writes data via zkClient.writeData(path, getBytes(data)). It appears is though commitOffsets() does block until all the offsets are committed.

Here is the source code for commitOffsets() (ref):

public void commitOffsets() {
    if (zkClient == null) {
        logger.error("zk client is null. Cannot commit offsets");
        return;
    }
    for (Entry<String, Pool<Partition, PartitionTopicInfo>> e : topicRegistry.entrySet()) {
        ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(config.getGroupId(), e.getKey());
        for (PartitionTopicInfo info : e.getValue().values()) {
            final long lastChanged = info.getConsumedOffsetChanged().get();
            if (lastChanged == 0) {
                logger.trace("consume offset not changed");
                continue;
            }
            final long newOffset = info.getConsumedOffset();
            //path: /consumers/<group>/offsets/<topic>/<brokerid-partition>
            final String path = topicDirs.consumerOffsetDir + "/" + info.partition.getName();
            try {
                ZkUtils.updatePersistentPath(zkClient, path, "" + newOffset);
            } catch (Throwable t) {
                logger.warn("exception during commitOffsets, path=" + path + ",offset=" + newOffset, t);
            } finally {
                info.resetComsumedOffsetChanged(lastChanged);
                if (logger.isDebugEnabled()) {
                    logger.debug("Committed [" + path + "] for topic " + info);
                }
            }
        }
    }
}

and for updatePersistentPath(...) (ref):

public static void updatePersistentPath(ZkClient zkClient, String path, String data) {
    try {
        zkClient.writeData(path, getBytes(data));
    } catch (ZkNoNodeException e) {
        createParentPath(zkClient, path);
        try {
            zkClient.createPersistent(path, getBytes(data));
        } catch (ZkNodeExistsException e2) {
            zkClient.writeData(path, getBytes(data));
        }
    }
}
like image 89
Drakes Avatar answered Sep 21 '22 03:09

Drakes