Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Delayed Queue implementation in Storm – Kafka, Cassandra, Redis or Beanstalk?

I have a storm topology to process messages from Kafka and make HTTP call / saves in Cassandra based on the task in hand. I process the messages as soon as they come. How ever few messages are not processed completely due to the response form external sources such as an HTTP. I would like to implement a exponential backoff mechanism for retrial in-case HTTP server does not respond/returns an error message to retry after some time. I could think of few ideas using which I could achieve them. I would like to know which of them will be a better solution also if there is any other solution that I can use which is fault tolerant. Since this is used to implement an exponential backoff each message will have a different delay time.

  • Send it another topic in Kafka which is consumed later. My preferred Solution. I know we can use Kafka offset so consume the message at a latter stage. How ever I could not find documentation/Sample code to do the same. It will be really helpful if any one can help me out with this.
  • Write the message Cassandra / Redis and write a scheduler to fetch the messages which are not processed and are ready to be consumed and Send it to Kafka so that my storm topology can consume it. (Existing solution in other legacy project(Non Storm))
  • Send to Beanstalk with Delay (Existing solution in other legacy project(Non Storm). How ever I would like to avoid using this solution and use it only in case I am out of option).

While this is pretty much what I would like to do. I am not able to find documentation to implement delayProcessingUntil as mentioned in Kafka - Delayed Queue implementation using high level consumer

I have done scheduled job from Data-store and delay using Beanstalk in the past, but I would prefer to use Kafka.

like image 847
Ankit Gupta Avatar asked Oct 30 '22 09:10

Ankit Gupta


1 Answers

Kafka spout has an exponential backoff message retry built-in. You can configure initial delay, delay multiplier and maximum delay through spout configuration. If there is an error in the bolt, you can call collector.fail(input). After that you just leave it to spout to do the retry.

https://github.com/apache/storm/blob/v0.10.0/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java

like image 94
hobgoblin Avatar answered Nov 15 '22 03:11

hobgoblin