I've recently started using kafka to read documents coming through a web crawler. What I'm noticing is when I'm dealing with few million documents, the consumer is processing the same message over and over again. Looks like the data is not getting committed for some reason. This is not the case when I'm testing the consumer with few hundred message.
I'm using kafka high level consumer client code in java. I'm using consumer group running on number of threads equivalent to number of partitions. So each thread is deciated to a partition. Here's a code snippet for polling data.
Here's the kafka consumer configuration parameters I'm setting. Rest are default values.
while (true) {
try{
if(consumerDao.canPollTopic()){
ConsumerRecords records =
consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT));
for (ConsumerRecord record : records) {
if(record.value()!=null){
TextAnalysisRequest textAnalysisObj = record.value();
if(textAnalysisObj!=null){
PostProcessRequest req = new PostProcessRequest();
req.setRequest(this.getRequest(textAnalysisObj));
PreProcessorUtil.submitPostProcessRequest(req, config);
}
}
}
}else{
Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP));
}
}catch(Exception ex){
LOGGER.error("Error in Full Consumer group worker", ex);
}
}
consumer.auto.commit=true
consumer.auto.commit.interval=1000
consumer.session.timeout=180000
consumer.poll.records=2147483647
consumer.request.timeout=181000
Here's the complete consumer config:
metric.reporters =
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 181000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.test.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 180000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest
My sample kafka queue is having 8 partitions with 2 replication factor.
The log retention period in server.properties is setup as 168 hours.
Not sure what I'm missing here.
log.retention.hours=168
log.roll.hours=168
A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.
2.1 Write idempotent message handler It's the easiest way to have a deal with duplicate messages. The message handler is idempotent if calling it multiple times with the same payload has no additional effect. For example, modify an already modified Order with the same payload should give the same result.
Kafka consumer reads data from a partition of a topic. One consumer can read from one partition at one time only. Once a message has been read by the consumer, it can't be re-read again.
An Idempotent Consumer pattern uses a Kafka consumer that can consume the same message any number of times, but only process it once. To implement the Idempotent Consumer pattern the recommended approach is to add a table to the database to track processed messages.
I increased my auto.commit.interval.ms=8000
in my consumer properties from 3000
to 8000
. This fixed the duplicate record issues.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With