I have a consumer worker application, that internally is launching X
number of threads, each thread is spawning it's KafkaCosnumer. Cosnumers have the same groupId
and are subscribed to the same topics. So, that each consumer gets it fair share of partitions.
The nature of the processing is that I cannot lose messages nor I can allow duplicates. The version of kafka I'm running is 0.10.2.1.
Here is the problem I am facing: consumer thread 1 starts consuming messages and on poll()
gets a batch of messages. I also implement ConsumerRebalanceListener
, so that every time message was successfully processed it gets added to the offsets
map. (See code below.) So, once rebalancing is happening I could commit my offsets before my partitions get re-assigned to the other consumer.
Sometimes, in order to process that batch, it takes longer than max.poll.interval.ms
, this is where rebalancing occurs and partition is pulled from consumer 1 and assigned to consumer 2. Consumer 1 doesn't know that partition was revoked and keeps on processing messages, in the meantime consumer 2 picked up from the last offset (which was committed by the RebalanceListener) and processes same messages.
Is there a way to inform the consumer that he has partitions revoked so that he can stop processing messages in the loop, which were already assigned to the other consumer?
public class RebalanceListener<K, V> implements ConsumerRebalanceListener {
private final KafkaConsumer<K, V> consumer;
private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS =
Maps.newConcurrentMap();
private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);
public RebalanceListener(KafkaConsumer<K, V> consumer) {
this.consumer = consumer;
}
public void addOffset(String topic, int partition, long offset) {
LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}",
topic, partition, offset);
CURRENT_OFFSETS.put(new TopicPartition(topic, partition),
new OffsetAndMetadata(offset, "commit"));
}
public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
return CURRENT_OFFSETS;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
LOGGER.debug("message=following partitions have been revoked from consumer: [{}]",
partitions.stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
LOGGER.debug("message=Comitting offsets for partititions [{}]",
CURRENT_OFFSETS.keySet().stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
consumer.commitSync(CURRENT_OFFSETS);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOGGER.debug("message=following partitions have been assigned to consumer: [{}]",
partitions.stream().map(
topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
.collect(joining(",")));
}
}
I think I could have a concurrent map of consumerId -- TopicPartition
created inside of the RebalanceListener
and then before processing every single message check if the current consumer is still associated with the record(each ConsumerRecord
has topic
and partition
fields).
If not - break the cycle and make a next poll()
.
This would be a viable solution if my worker app would be running in one single instance, even though having several KafkaConsumer threads spinning. But once I scale it up, I won't be able to stash offsets and consumer-topicPartition mapping in the static maps. That will have to be some kind of centralized storage, database or, let's say, Redis.
But then, before every time I process an item, I would have to ask if my record can be legitimately processed by current consumer thread. In case of scaled worker app, it will be a network call to the external storage, which would defeat the purpose of using kafka as it will slow down the processing. I might just opt in to perform offset commits after a single item was processed.
2.1 Write idempotent message handler It's the easiest way to have a deal with duplicate messages. The message handler is idempotent if calling it multiple times with the same payload has no additional effect. For example, modify an already modified Order with the same payload should give the same result.
During a rebalance event, every consumer that's still in communication with the group coordinator must revoke then regain its partitions, for all partitions within its assignment. More partitions to manage means more time to wait as all the consumers within the group take the time to manage those relationships.
Kafka Rebalance happens when a new consumer is either added (joined) into the consumer group or removed (left). It becomes dramatic during application service deployment rollout, as multiple instances restarted at the same time, and rebalance latency significantly increasing.
You need to implement onPartitionsRevoked()
https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked(java.util.Collection)
It is guaranteed that all consumer processes will invoke onPartitionsRevoked prior to any process invoking onPartitionsAssigned. So if offsets or other state is saved in the onPartitionsRevoked call it is guaranteed to be saved by the time the process taking over that partition has their onPartitionsAssigned callback called to load the state.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With