Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer receiving same message multiple times

I've recently started using kafka to read documents coming through a web crawler. What I'm noticing is when I'm dealing with few million documents, the consumer is processing the same message over and over again. Looks like the data is not getting committed for some reason. This is not the case when I'm testing the consumer with few hundred message.

I'm using kafka high level consumer client code in java. I'm using consumer group running on number of threads equivalent to number of partitions. So each thread is deciated to a partition. Here's a code snippet for polling data.

while (true) {
    try{
        if(consumerDao.canPollTopic()){
            ConsumerRecords records = 
              consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT));
            for (ConsumerRecord record : records) {
                if(record.value()!=null){
                    TextAnalysisRequest textAnalysisObj = record.value();
                    if(textAnalysisObj!=null){
                        PostProcessRequest req = new PostProcessRequest();
                        req.setRequest(this.getRequest(textAnalysisObj));
                        PreProcessorUtil.submitPostProcessRequest(req, config);
                    }
                }
            }
        }else{
            Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP));
        }
    }catch(Exception ex){
        LOGGER.error("Error in Full Consumer group worker", ex);
} }
Here's the kafka consumer configuration parameters I'm setting. Rest are default values.


consumer.auto.commit=true
consumer.auto.commit.interval=1000
consumer.session.timeout=180000
consumer.poll.records=2147483647
consumer.request.timeout=181000

Here's the complete consumer config:


metric.reporters = 
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 181000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.test.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 180000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest
My sample kafka queue is having 8 partitions with 2 replication factor.

The log retention period in server.properties is setup as 168 hours.

log.retention.hours=168
log.roll.hours=168
Not sure what I'm missing here.

like image 744
Shamik Avatar asked Sep 27 '16 18:09

Shamik


People also ask

Can Kafka deliver same message twice?

A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.

How do I get rid of duplicate messages in Kafka?

2.1 Write idempotent message handler It's the easiest way to have a deal with duplicate messages. The message handler is idempotent if calling it multiple times with the same payload has no additional effect. For example, modify an already modified Order with the same payload should give the same result.

Can a Kafka message be read multiple times?

Kafka consumer reads data from a partition of a topic. One consumer can read from one partition at one time only. Once a message has been read by the consumer, it can't be re-read again.

What is idempotent consumer in Kafka?

An Idempotent Consumer pattern uses a Kafka consumer that can consume the same message any number of times, but only process it once. To implement the Idempotent Consumer pattern the recommended approach is to add a table to the database to track processed messages.


1 Answers

I increased my auto.commit.interval.ms=8000 in my consumer properties from 3000 to 8000. This fixed the duplicate record issues.

like image 50
Aaron Faltesek Avatar answered Nov 15 '22 11:11

Aaron Faltesek