I am writing a consumer that manually commits the offset once a series of records are commited to Mongo.
In the case of a Mongo error or any other error an attempt is made to persit the record to an error processing collection
for replay at a later date.
If Mongo is down then I want the consumer to stop processing for a period of time before trying to read the records from the uncommited offset from Kakfa.
The below sample works but I would like to know what the best practice for this scenario is?
while (true) {
boolean commit = false;
try {
ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
kafkaMessageProcessor.processRecords(records);
commit = true;
}
catch (Exception e) {
logger.error("Unable to consume closing consumer and restarting", e);
try {
consumer.close();
}
catch (Exception consumerCloseError) {
logger.error("Unable to close consumer", consumerCloseError);
}
logger.error(String.format("Attempting recovery in [%d] milliseconds.", recoveryInterval), e);
Thread.sleep(recoveryInterval);
consumer = createConsumer(properties);
}
if (commit) {
consumer.commitSync();
}
}
private KafkaConsumer<K, V> createConsumer(Properties properties) {
KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
consumer.subscribe(topics);
return consumer;
}
If I don't recreate the consumer I get the following error.
o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead.
o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group test.consumer
Here is my code using client version 0.10.0 .
Seem is ok for you demand.
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageProcesser implements Runnable {
private static Logger logger = LoggerFactory.getLogger(MessageProcesser.class);
private final ExecutorService pool = Executors.newFixedThreadPool(4);
private final KafkaConsumer<String, String> consumer;
private final String topic;
private final AtomicBoolean closed = new AtomicBoolean(false);
public MessageProcesser(String groupId, String topic, String kafkaServer) {
this.topic = topic;
Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer);
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "false");
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(Collections.singleton(topic));
while (true) {
if (closed.get()) {
consumer.close();
}
ConsumerRecords<String, String> records = consumer.poll(1000 * 60);
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
if (null == value) {
continue;
}
boolean processResult = false;
try {
Future<Object> f = pool.submit(new ProcessCommand(value));
processResult = (boolean) f.get(100, TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (!processResult) {
//here if process fail, seek to current offset
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
} else {
this.commitAsyncOffset(record);
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
if (!closed.get()) {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
// ignore
}
}
}
}
public void shutdown() {
closed.set(true);
}
public void commitAsyncOffset(ConsumerRecord<String, String> record) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
logger.error("kafka offset commit fail. {} {}", offsets, PushUtil.getStackString(e.getStackTrace()));
}
}
});
}
}
as I understand it, the (new) client is the one that keeps the consumed offsets. The commit sends the offsets to the server, but it has no effect on next poll from that client, since the client says to the server "give me next messages on THAT offset". Why is then the offset sent to the server? For next rebalance. So the only situation server uses the committed offsets is when some client dies/disconnects - then the partitions are rebalanced and with this rebalances the clients get the offsets from the server.
So if you don't commit offset and then call poll(), you cannot expect that the message will be read again. To this there would have to be a possibility to rollback the offset in the client. I didn't try but I think calling KafkaConsumer.seek to the offset of failed message should do the trick.
https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)
BTW, in this way you can even commit the last successfuly processed message and seek to the first failed, so that you don't need to repeat the whole record list, when failure occured for some message in the middle of it.
If you didn't commit the offset and the auto.commit.enable property is false then when the call to Mongo fails you just wait the time that you think is necessary and retry to poll().
The problem that you are seeing is that the new consumer uses the poll() as a heartbeat mechanism, so if you wait for longer that the timeout request then the coordinator for the topic will kickout the consumer because it will think is dead and it will rebalance the group. So wait for mongo but you may want to poll() ones in a while.
EDIT: As a workaround you can put this property higher request.timeout.ms
Hope it helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With