Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does Kafka have a batch consumer?

Tags:

apache-kafka

The High-Level consumer API seems to be reading one message at a time.

This could be quite problematic for the consumers if they want to process and submit those messages to other downstream consumers like Solr or Elastic-Search because they prefer to have messages in bulk rather than one at a time.

It is not trivial to batch those messages in memory too because than the offsets in Kafka will also need to synced only when the batch is already committed otherwise a crashed kafka-consumer with uncommitted downstream messages (as in Solr or ES) will have its offsets updated already and hence loose messages.

The consumer could consume messages more than once if it crashes after committing messages downstream but before updating message offsets.

If Kafka consumes messages in batch, then some pointers to the code/documentation would be much appreciated.

Thanks!

like image 346
user2250246 Avatar asked Mar 03 '16 19:03

user2250246


1 Answers

I am not aware of a batch consumer. But even if there is one your main problem stays. You want to commit the offset after you successfully forwarded the data. One way to achive this is to turn off the auto commit of the consumer by setting the property auto.commit.enable = false. The tradeoff is of course that you have to take care about when to commit your offsets.

Find a full documentation of the consumer properties here: https://kafka.apache.org/documentation.html#consumerconfigs

A nice example on how to manualy commiting the offset stolen from the java-doc (https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html):

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "false");
 props.put("auto.commit.interval.ms", "1000");
 props.put("session.timeout.ms", "30000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("foo", "bar"));
 final int minBatchSize = 200;
 List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
 while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
 }
like image 76
TobiSH Avatar answered Sep 25 '22 08:09

TobiSH