Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Rebalancing. Duplicate processing issue

I have a consumer worker application, that internally is launching X number of threads, each thread is spawning it's KafkaCosnumer. Cosnumers have the same groupId and are subscribed to the same topics. So, that each consumer gets it fair share of partitions.

The nature of the processing is that I cannot lose messages nor I can allow duplicates. The version of kafka I'm running is 0.10.2.1.

Here is the problem I am facing: consumer thread 1 starts consuming messages and on poll() gets a batch of messages. I also implement ConsumerRebalanceListener, so that every time message was successfully processed it gets added to the offsets map. (See code below.) So, once rebalancing is happening I could commit my offsets before my partitions get re-assigned to the other consumer. Sometimes, in order to process that batch, it takes longer than max.poll.interval.ms, this is where rebalancing occurs and partition is pulled from consumer 1 and assigned to consumer 2. Consumer 1 doesn't know that partition was revoked and keeps on processing messages, in the meantime consumer 2 picked up from the last offset (which was committed by the RebalanceListener) and processes same messages.

Is there a way to inform the consumer that he has partitions revoked so that he can stop processing messages in the loop, which were already assigned to the other consumer?

public class RebalanceListener<K, V> implements ConsumerRebalanceListener {

    private final KafkaConsumer<K, V> consumer;

    private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS =
            Maps.newConcurrentMap();

    private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);

    public RebalanceListener(KafkaConsumer<K, V> consumer) {
        this.consumer = consumer;
    }

    public void addOffset(String topic, int partition, long offset) {
        LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}",
                topic, partition, offset);
        CURRENT_OFFSETS.put(new TopicPartition(topic, partition),
                new OffsetAndMetadata(offset, "commit"));
    }

    public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
        return CURRENT_OFFSETS;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        LOGGER.debug("message=following partitions have been revoked from consumer: [{}]",
                partitions.stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
        LOGGER.debug("message=Comitting offsets for partititions [{}]",
                CURRENT_OFFSETS.keySet().stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
        consumer.commitSync(CURRENT_OFFSETS);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        LOGGER.debug("message=following partitions have been assigned to consumer: [{}]",
                partitions.stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
    }

}

I think I could have a concurrent map of consumerId -- TopicPartition created inside of the RebalanceListener and then before processing every single message check if the current consumer is still associated with the record(each ConsumerRecord has topic and partition fields). If not - break the cycle and make a next poll().

This would be a viable solution if my worker app would be running in one single instance, even though having several KafkaConsumer threads spinning. But once I scale it up, I won't be able to stash offsets and consumer-topicPartition mapping in the static maps. That will have to be some kind of centralized storage, database or, let's say, Redis.

But then, before every time I process an item, I would have to ask if my record can be legitimately processed by current consumer thread. In case of scaled worker app, it will be a network call to the external storage, which would defeat the purpose of using kafka as it will slow down the processing. I might just opt in to perform offset commits after a single item was processed.

like image 343
Ihor M. Avatar asked Nov 03 '17 19:11

Ihor M.


People also ask

How do I get rid of duplicate messages in Kafka?

2.1 Write idempotent message handler It's the easiest way to have a deal with duplicate messages. The message handler is idempotent if calling it multiple times with the same payload has no additional effect. For example, modify an already modified Order with the same payload should give the same result.

What happens during Kafka rebalance?

During a rebalance event, every consumer that's still in communication with the group coordinator must revoke then regain its partitions, for all partitions within its assignment. More partitions to manage means more time to wait as all the consumers within the group take the time to manage those relationships.

What causes Kafka rebalancing?

Kafka Rebalance happens when a new consumer is either added (joined) into the consumer group or removed (left). It becomes dramatic during application service deployment rollout, as multiple instances restarted at the same time, and rebalance latency significantly increasing.


1 Answers

You need to implement onPartitionsRevoked()

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked(java.util.Collection)

It is guaranteed that all consumer processes will invoke onPartitionsRevoked prior to any process invoking onPartitionsAssigned. So if offsets or other state is saved in the onPartitionsRevoked call it is guaranteed to be saved by the time the process taking over that partition has their onPartitionsAssigned callback called to load the state.

like image 128
Hans Jespersen Avatar answered Nov 18 '22 07:11

Hans Jespersen