Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka - Delayed Queue implementation using high level consumer

Want to implement a delayed consumer using the high level consumer api

main idea:

  • produce messages by key (each msg contains creation timestamp) this makes sure that each partition has ordered messages by produced time.
  • auto.commit.enable=false (will explicitly commit after each message process)
  • consume a message
  • check message timestamp and check if enough time has passed
  • process message (this operation will never fail)
  • commit 1 offset

    while (it.hasNext()) {   val msg = it.next().message()   //checks timestamp in msg to see delay period exceeded   while (!delayedPeriodPassed(msg)) {       waitSomeTime() //Thread.sleep or something....   }   //certain that the msg was delayed and can now be handled   Try { process(msg) } //the msg process will never fail the consumer   consumer.commitOffsets //commit each msg } 

some concerns about this implementation:

  1. commit each offset might slow ZK down
  2. can consumer.commitOffsets throw an exception? if yes i will consume the same message twice (can solve with idempotent messages)
  3. problem waiting long time without committing the offset, for example delay period is 24 hours, will get next from iterator, sleep for 24 hours, process and commit (ZK session timeout ?)
  4. how can ZK session keep-alive without commit new offsets ? (setting a hive zookeeper.session.timeout.ms can resolve in dead consumer without recognising it)
  5. any other problems im missing?

Thanks!

like image 814
Nimrod007 Avatar asked Aug 02 '15 18:08

Nimrod007


People also ask

Does Kafka support delayed messages?

The usage of Apache Kafka is growing tremendously because of its unique design and high performance, but it lacks the support for delay queues and dead letter queues.

How can I make Kafka consumer faster?

Here's a key Kafka optimization tip: To improve throughput, you can scale up the minimum amount of data fetched in a request. This results in fewer requests. The messages are then delivered in larger batches. This is critical especially when there is a low volume of data being produced.

Why my Kafka consumer is slow?

Kafka ConsumersIf there are way too many producers writing data to the same topic when there are a limited number of consumers then then the reading processes will always be slow.


2 Answers

One way to go about this would be to use a different topic where you push all messages that are to be delayed. If all delayed messages should be processed after the same time delay this will be fairly straight forward:

while(it.hasNext()) {     val message = it.next().message()          if(shouldBeDelayed(message)) {         val delay = 24 hours         val delayTo = getCurrentTime() + delay         putMessageOnDelayedQueue(message, delay, delayTo)     }     else {        process(message)     }      consumer.commitOffset() } 

All regular messages will now be processed as soon as possible while those that need a delay gets put on another topic.

The nice thing is that we know that the message at the head of the delayed topic is the one that should be processed first since its delayTo value will be the smallest. Therefore we can set up another consumer that reads the head message, checks if the timestamp is in the past and if so processes the message and commits the offset. If not it does not commit the offset and instead just sleeps until that time:

while(it.hasNext()) {     val delayedMessage = it.peek().message()     if(delayedMessage.delayTo < getCurrentTime()) {         val readMessage = it.next().message         process(readMessage.originalMessage)         consumer.commitOffset()     } else {         delayProcessingUntil(delayedMessage.delayTo)     } } 

In case there are different delay times you could partition the topic on the delay (e.g. 24 hours, 12 hours, 6 hours). If the delay time is more dynamic than that it becomes a bit more complex. You could solve it by introducing having two delay topics. Read all messages off delay topic A and process all the messages whose delayTo value are in the past. Among the others you just find the one with the closest delayTo and then put them on topic B. Sleep until the closest one should be processed and do it all in reverse, i.e. process messages from topic B and put the once that shouldn't yet be proccessed back on topic A.

To answer your specific questions (some have been addressed in the comments to your question)

  1. Commit each offset might slow ZK down

You could consider switching to storing the offset in Kafka (a feature available from 0.8.2, check out offsets.storage property in consumer config)

  1. Can consumer.commitOffsets throw an exception? if yes, I will consume the same message twice (can solve with idempotent messages)

I believe it can, if it is not able to communicate with the offset storage for instance. Using idempotent messages solves this problem though, as you say.

  1. Problem waiting long time without committing the offset, for example delay period is 24 hours, will get next from iterator, sleep for 24 hours, process and commit (ZK session timeout?)

This won't be a problem with the above outlined solution unless the processing of the message itself takes more than the session timeout.

  1. How can ZK session keep-alive without commit new offsets? (setting a hive zookeeper.session.timeout.ms can resolve in dead consumer without recognizing it)

Again with the above you shouldn't need to set a long session timeout.

  1. Any other problems I'm missing?

There always are ;)

like image 117
Emil H Avatar answered Sep 18 '22 15:09

Emil H


Use Tibco EMS or other JMS Queue's. They have retry delay built in . Kafka may not be the right design choice for what you are doing

like image 43
Dhyan Avatar answered Sep 19 '22 15:09

Dhyan