Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Having a Kafka Consumer read a single message at a time

Tags:

apache-kafka

We have Kafka setup to be able to process messages in parallel by several servers. But every message must only be processed exactly once (and by only one server). We have this up and running and it’s working fine.

Now, the problem for us is that the Kafka Consumers reads messages in batches for maximal efficiency. This leads to a problem if/when processing fails, the server shuts down or whatever, because then we loose data that was about to be processed.

Is there a way to get the Consumer to only read on message at a time to let Kafka keep the unprocessed messages? Something like; Consumer pulls one message -> process -> commit offset when done, repeat. Is this feasible using Kafka? Any thoughts/ideas?

Thanks!

like image 955
Martin at Mavrix Avatar asked Aug 17 '15 11:08

Martin at Mavrix


3 Answers

You might try setting max.poll.records to 1.

like image 124
gengel Avatar answered Oct 16 '22 10:10

gengel


You mention having exactly one processing, but then you're worried about losing data. I'm assuming you're just worried about the edge case when one of your server fails? And you lose data?

I don't think there's a way to accomplish one message at a time. Looking through the consumer configurations, there only seems to be a option for setting the max bytes a consumer can fetch from Kafka, not number of messages.

fetch.message.max.bytes

But if you're worried about losing data completely, if you never commit the offset Kafka will not mark is as being committed and it won't be lost. Reading through the Kafka documentation about delivery semantics,

So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.

So to achieve exactly-once processing is not something that Kafka enables by default. It requires you to implement storing the offset whenever you write the output of your processing to storage.

But this can be handled more simply and generally by simply letting the consumer store its offset in the same place as its output...As an example of this, our Hadoop ETL that populates data in HDFS stores its offsets in HDFS with the data it reads so that it is guaranteed that either data and offsets are both updated or neither is.

I hope that helps.

like image 40
Morgan Kenyon Avatar answered Oct 16 '22 11:10

Morgan Kenyon


It depends on what client you are going to use. For C++ and python, it is possible to consume ONE message each time.

For python, I used https://github.com/mumrah/kafka-python. The following code can consume one message each time:

message = self.__consumer.get_message(block=False, timeout=self.IterTimeout, get_partition_info=True )

__consumer is the object of SimpleConsumer.

See my question and answer here:How to stop Python Kafka Consumer in program?

For C++, I am using https://github.com/edenhill/librdkafka. The following code can consume one message each time.

214         while( m_bRunning )
215         {
216                 // Start to read messages from the local queue.
217                 RdKafka::Message *msg = m_consumer->consume(m_topic, m_partition, 1000);
218                 msg_consume(msg);
219                 delete msg;
220                 m_consumer->poll(0);
221         }

m_consumer is the pointer to C++ Consumer object (C++ API).

Hope this help.

like image 1
BAE Avatar answered Oct 16 '22 12:10

BAE