Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java Kafka consumer group failing to consume a few messages

One issue is noticed that wherein Kafka consumer group (implemented in java) misses a few message from broker consistently. As a first line of debugging, via kafka console consumer, I can see those messages available in the broker.

Kafka broker version: 0.10.1.0

Kafka client version:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.9.0.1</version>
</dependency>

Kafka consumer configuration:

Properties props = new Properties();
props.put("bootstrap.servers","broker1,broker2,broker3");
props.put("group.id", "myGroupIdForDemo");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("heartbeat.interval.ms", "25000"); 
props.put("session.timeout.ms", "30000"); 
props.put("max.poll.interval.ms", "300000");
props.put("max.poll.records", "1");
props.put("zookeeper.session.timeout.ms", "120000");
props.put("zookeeper.sync.time.ms", "10000");
props.put("auto.commit.enable", "false");
props.put("auto.commit.interval.ms", "60000");
props.put("auto.offset.reset", "earliest");
props.put("consumer.timeout.ms", "-1");
props.put("rebalance.max.retries", "20");
props.put("rebalance.backoff.ms", "6000");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

EDIT - Addition of some more information

Some more info I would like to add: There are total 6 partitions. However, total number of consumers are 40 for the topic having the same consumer group id. I do understand that 34 consumers sit idle and do nothing.

However, the aspect I would like to understand is, if a consumer fails to send heartbeat to the extent that broker considers as dead and reassigns the partition, will any of the idle consumer get a chance to consume message ? This issue of message being not consumed is always being noticed in certain partitions only. I mean messages are failed to be delivered/consumed from the same partition.

Any help is appreciated. Thanks.

like image 347
thomas Avatar asked Dec 22 '17 09:12

thomas


1 Answers

a) message may not exist even in Kafka - in that case check if message size does not exceed the max message size allowed in the kafka broker configs.

b) if your consumer connected to Kafka instance 1 and 2-d instance is not connected, you may miss messages from the 2-d kafka: so , specify all brokers in the consumer connection string.

3) if message exists on the kafka and you connected, you may fail to deserialize the message, so, try another deserializer, may be not string, but bytearray one and see what happens, will message be consumed? If yes, conversion to string is problematic.

4) message could be "stolen" by another working consumer , working under the same group ID, choose unique group id.

5) what logger you use to see consumed message? do not you suspect it is a logger issue?

6) May be you kill/stop consumer, before it consumes all messages?

7) May be you consume, but fail because of consumer memory limits? Mey be increase -Xmx. (heap size)

like image 148
Vladimir Nabokov Avatar answered Oct 22 '22 22:10

Vladimir Nabokov