Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle error and don't commit when use Kafka Streams DSL

For Kafka Streams, if we use lower-level processor API, we can control to commit or not. So if problems happens in our code, and we don't want to commit this message. In this case, Kafka will redeliver this message multiple times until the problem gets fixed.

But how to control whether commit the message when use its higher-level stream DSL API?

Resources:

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

like image 265
jeffery.yuan Avatar asked Feb 05 '17 06:02

jeffery.yuan


1 Answers

Your statement is not completely true. You cannot "control to commit or not" -- at least not directly (neither in Processor API nor in DSL). You can only use ProcessorContext#commit() to request additional commits. Thus, after a call to #commit() Streams tries to commit as soon as possible, but it's not an immediate commit. Furthermore, Streams will commit automatically even if you never call #commit(). You can control Streams commit interval via Streams configuration commit.interval.m (cf. http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)

In case of a "problem", it depends on the type of problem you have how to respond to it:

  • if you detect a problem you cannot recover from, you can only throw an exception and "stop the world" (cf. below).
  • if you have a recoverable error, you need to "loop" within your own code (e.g., within Processor#process() or KeyValueMapper#apply() until the problem got resolved and you can successfully process the current message (note, that you might run into a timeout, ie, exception, using this strategy -- cf. consumer configs heartbeat.interval.ms and for 0.10.1 session.timeout.ms [KIP-62])
  • an alternative would be, to put records that cannot be processed right now into an StateStore and process them later on. However, it's hard to get right and also breaks a few Streams assumptions (eg, processing order). It's not recommended to use, and if used, you must be very carefully about the implications

If there is an uncaught exception, StreamThread will die and no commit happens (you can register an exception handler to get notified about this: http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code. If all you StreamThread died, you will need to create a new instance of KafkaStreams to restart you application.

You must not return from user code before a message got successfully processed, because if you return, Streams assumes that the message got successfully processed (and thus might commit the corresponding offset). With regard to bullet point (3), putting a record into a special StateStore for later processing is considered a "successfully" processed record.

like image 187
Matthias J. Sax Avatar answered Sep 18 '22 20:09

Matthias J. Sax