For Kafka Streams, if we use lower-level processor API, we can control to commit or not. So if problems happens in our code, and we don't want to commit this message. In this case, Kafka will redeliver this message multiple times until the problem gets fixed.
But how to control whether commit the message when use its higher-level stream DSL API?
Resources:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
Your statement is not completely true. You cannot "control to commit or not" -- at least not directly (neither in Processor API nor in DSL). You can only use ProcessorContext#commit()
to request additional commits. Thus, after a call to #commit()
Streams tries to commit as soon as possible, but it's not an immediate commit. Furthermore, Streams will commit automatically even if you never call #commit()
. You can control Streams commit interval via Streams configuration commit.interval.m
(cf. http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application)
In case of a "problem", it depends on the type of problem you have how to respond to it:
Processor#process()
or KeyValueMapper#apply()
until the problem got resolved and you can successfully process the current message (note, that you might run into a timeout, ie, exception, using this strategy -- cf. consumer configs heartbeat.interval.ms
and for 0.10.1 session.timeout.ms
[KIP-62])StateStore
and process them later on. However, it's hard to get right and also breaks a few Streams assumptions (eg, processing order). It's not recommended to use, and if used, you must be very carefully about the implicationsIf there is an uncaught exception, StreamThread
will die and no commit happens (you can register an exception handler to get notified about this: http://docs.confluent.io/current/streams/developer-guide.html#using-kafka-streams-within-your-application-code. If all you StreamThread
died, you will need to create a new instance of KafkaStreams
to restart you application.
You must not return from user code before a message got successfully processed, because if you return, Streams assumes that the message got successfully processed (and thus might commit the corresponding offset). With regard to bullet point (3), putting a record into a special StateStore for later processing is considered a "successfully" processed record.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With