I have a basic stream processing flow which looks like
master topic -> my processing in a mapper/filter -> output topics
and I am wondering about the best way to handle "bad messages". This could potentially be things like messages that I can't deserialize properly, or perhaps the processing/filtering logic fails in some unexpected way (I have no external dependencies so there should be no transient errors of that sort).
I was considering wrapping all my processing/filtering code in a try catch and if an exception was raised then routing to an "error topic". Then I can study the message and modify it or fix my code as appropriate and then replay it on to master. If I let any exceptions propagate, the stream seems to get jammed and no more messages are picked up.
For completeness here is my code (pseudo-ish):
class Document { // Fields } class AnalysedDocument { Document document; String rawValue; Exception exception; Analysis analysis; // All being well AnalysedDocument(Document document, Analysis analysis) {...} // Analysis failed AnalysedDocument(Document document, Exception exception) {...} // Deserialisation failed AnalysedDocument(String rawValue, Exception exception) {...} } KStreamBuilder builder = new KStreamBuilder(); KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder .stream(Serdes.String(), Serdes.String(), "master") .mapValues(new ValueMapper<String, AnalysedDocument>() { @Override public AnalysedDocument apply(String rawValue) { Document document; try { // Deserialise document = ... } catch (Exception e) { return new AnalysedDocument(rawValue, exception); } try { // Perform analysis Analysis analysis = ... return new AnalysedDocument(document, analysis); } catch (Exception e) { return new AnalysedDocument(document, exception); } } }); // Branch based on whether analysis mapping failed to produce errorStream and successStream errorStream.to(Serdes.String(), customPojoSerde(), "error"); successStream.to(Serdes.String(), customPojoSerde(), "analysed"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start();
Any help greatly appreciated.
As point 1 if having just a producer producing message we don't need Kafka Stream. If consumer messages from one Kafka cluster but publish to different Kafka cluster topics. In that case, you can even use Kafka Stream but have to use a separate Producer to publish messages to different clusters.
Kafka Broker ConfigurationAn optional configuration property, “message. max. bytes“, can be used to allow all topics on a Broker to accept messages of greater than 1MB in size. And this holds the value of the largest record batch size allowed by Kafka after compression (if compression is enabled).
Right now, Kafka Streams offers only limited error handling capabilities. There is work in progress to simplify this. For now, your overall approach seems to be a good way to go.
One comment about handling de/serialization errors: handling those error manually, requires you to do de/serialization "manually". This means, you need to configure ByteArraySerde
s for key and value for you input/output topic of your Streams app and add a map()
that does the de/serialization (ie, KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType>
-- or the other way round if you also want to catch serialization exceptions). Otherwise, you cannot try-catch
deserialization exceptions.
With your current approach, you "only" validate that the given string represents a valid document -- but it could be the case, that the message itself is corrupted and cannot be converted into a String
in the source operator in the first place. Thus, you don't actually cover deserialization exception with you code. However, if you are sure a deserialization exception can never happen, you approach would be sufficient, too.
Update
This issues is tackled via KIP-161 and will be included in the next release 1.0.0. It allows you to register an callback via parameter default.deserialization.exception.handler
. The handler will be invoked every time a exception occurs during deserialization and allows you to return an DeserializationResponse
(CONTINUE
-> drop the record an move on, or FAIL
that is the default).
Update 2
With KIP-210 (will be part of in Kafka 1.1) it's also possible to handle errors on the producer side, similar to the consumer part, by registering a ProductionExceptionHandler
via config default.production.exception.handler
that can return CONTINUE
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With