Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Effective strategy to avoid duplicate messages in apache kafka consumer

I have been studying apache kafka for a month now. I am however, stuck at a point now. My use case is, I have two or more consumer processes running on different machines. I ran a few tests in which I published 10,000 messages in kafka server. Then while processing these messages I killed one of the consumer processes and restarted it. Consumers were writing processed messages in a file. So after consumption finished, file was showing more than 10k messages. So some messages were duplicated.

In consumer process I have disabled auto commit. Consumers manually commit offsets batch wise. So for e.g if 100 messages are written to file, consumer commits offsets. When single consumer process is running and it crashes and recovers duplication is avoided in this manner. But when more than one consumers are running and one of them crashes and recovers, it writes duplicate messages to file.

Is there any effective strategy to avoid these duplicate messages?

like image 443
Shades88 Avatar asked Apr 15 '15 10:04

Shades88


People also ask

How do you prevent duplicates in Kafka?

2.1 Write idempotent message handler It's the easiest way to have a deal with duplicate messages. The message handler is idempotent if calling it multiple times with the same payload has no additional effect. For example, modify an already modified Order with the same payload should give the same result.

Why does Kafka take the same message multiple times?

A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.

How can Kafka consumer improve performance?

Improving throughput by increasing the minimum amount of data fetched in a request. Use the fetch.max.wait.ms and fetch. min. bytes configuration properties to set thresholds that control the number of requests from your consumer.

How do I prevent duplicate writes in Kafka?

Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded Include a primary key (UUID or something) in the message and deduplicate on the consumer. If you do one of these things, the log that Kafka hosts will be duplicate-free.

Does Apache Kafka guarantee against duplicate messages?

But if you read the fine print, you will discover that the guarantee only applies to Apache Kafka messaging. Specifically, if Apache Kafka invokes a message handler more than once for the same message, it detects and discards any duplicate messages produced by the handler. The message handler will still execute the database transaction repeatedly.

How to avoid data loss from Kafka?

This can result in “out-of-order” or missing messages. You can also go for some Kafka Connect. To avoid data loss on consumer side use following settings while start kafka server: This will make sure that the consumer always starts from the beginning of the partition and never misses a message.

How do I read Kafka logs without duplicates?

Include a primary key (UUID or something) in the message and deduplicate on the consumer. If you do one of these things, the log that Kafka hosts will be duplicate-free. However, reading without duplicates depends on some co-operation from the consumer too.


2 Answers

The short answer is, no.

What you're looking for is exactly-once processing. While it may often seem feasible, it should never be relied upon because there are always caveats.

Even in order to attempt to prevent duplicates you would need to use the simple consumer. How this approach works is for each consumer, when a message is consumed from some partition, write the partition and offset of the consumed message to disk. When the consumer restarts after a failure, read the last consumed offset for each partition from disk.

But even with this pattern the consumer can't guarantee it won't reprocess a message after a failure. What if the consumer consumes a message and then fails before the offset is flushed to disk? If you write to disk before you process the message, what if you write the offset and then fail before actually processing the message? This same problem would exist even if you were to commit offsets to ZooKeeper after every message.

There are some cases, though, where exactly-once processing is more attainable, but only for certain use cases. This simply requires that your offset be stored in the same location as unit application's output. For instance, if you write a consumer that counts messages, by storing the last counted offset with each count you can guarantee that the offset is stored at the same time as the consumer's state. Of course, in order to guarantee exactly-once processing this would require that you consume exactly one message and update the state exactly once for each message, and that's completely impractical for most Kafka consumer applications. By its nature Kafka consumes messages in batches for performance reasons.

Usually your time will be more well spent and your application will be much more reliable if you simply design it to be idempotent.

like image 116
2 revs, 2 users 96% Avatar answered Sep 21 '22 07:09

2 revs, 2 users 96%


This is what Kafka FAQ has to say on the subject of exactly-once:

How do I get exactly-once messaging from Kafka?

Exactly once semantics has two parts: avoiding duplication during data production and avoiding duplicates during data consumption.

There are two approaches to getting exactly once semantics during data production:

  • Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded
  • Include a primary key (UUID or something) in the message and deduplicate on the consumer.

If you do one of these things, the log that Kafka hosts will be duplicate-free. However, reading without duplicates depends on some co-operation from the consumer too. If the consumer is periodically checkpointing its position then if it fails and restarts it will restart from the checkpointed position. Thus if the data output and the checkpoint are not written atomically it will be possible to get duplicates here as well. This problem is particular to your storage system. For example, if you are using a database you could commit these together in a transaction. The HDFS loader Camus that LinkedIn wrote does something like this for Hadoop loads. The other alternative that doesn't require a transaction is to store the offset with the data loaded and deduplicate using the topic/partition/offset combination.

I think there are two improvements that would make this a lot easier:

  • Producer idempotence could be done automatically and much more cheaply by optionally integrating support for this on the server.
  • The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon
like image 22
RaGe Avatar answered Sep 18 '22 07:09

RaGe