Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Delaying Kafka Streams consuming

I'm trying to use Kafka Streams (i.e. not a simple Kafka Consumer) to read from a retry topic with events that have previously failed to process. I wish to consume from the retry topic, and if processing still fails (for example, if an external system is down), I wish to put the event back on the retry topic. Thus I don't want to keep consuming immediately, but instead wait a while before consuming, in order to not flood the systems with messages that are temporarily unprocessable.

Simplified, the code currently does this, and I wish to add a delay to it.

fun createTopology(topic: String): Topology {
    val streamsBuilder = StreamsBuilder()

    streamsBuilder.stream<String, ArchivalData>(topic, Consumed.with(Serdes.String(), ArchivalDataSerde()))
        .peek { key, msg -> logger.info("Received event for key $key : $msg") }
        .map { key, msg -> enrich(msg) }
        .foreach { key, enrichedMsg -> archive(enrichedMsg) }

    return streamsBuilder.build()
}

I have tried to use Window Delay to set this up, but have not managed to get it to work. I could of course do a sleep inside a peek, but that would leave a thread hanging and does not sound like a very clean solution.

The exact details of how the delay would work is not terribly important to my use case. For example, all of these would work fine:

  1. All events on the topic in the past x seconds are all consumed at once. After it begins / finishes to consume, the stream waits x seconds before consuming again
  2. Every event is processed x seconds after being put on the topic
  3. The stream consumes messages with a delay of x seconds between every event

I would be very grateful if someone could provide a few lines of Kotlin or Java code that would accomplish any of the above.

like image 943
sjoblomj Avatar asked Dec 09 '19 13:12

sjoblomj


1 Answers

You cannot really pause reading from the input topic using Kafka Streams—the only way to "delay" would be to call a "sleep", but as you mentioned, that blocks the whole thread and is not a good solution.

However, what you can do is to use a stateful processor, e.g., process() (with attached state store) instead of foreach(). If the retry fails, you don't put the record back into the input topic, but you put it into the store and also register a punctuation with desired retry delay. If the punctuation fires, you retry and if the retry succeeds, you delete the entry from the store and cancel the punctuation; otherwise, you wait until the punctuation fires again.

like image 165
Matthias J. Sax Avatar answered Oct 13 '22 00:10

Matthias J. Sax