Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling bad messages using Kafka's Streams API

I have a basic stream processing flow which looks like

master topic -> my processing in a mapper/filter -> output topics 

and I am wondering about the best way to handle "bad messages". This could potentially be things like messages that I can't deserialize properly, or perhaps the processing/filtering logic fails in some unexpected way (I have no external dependencies so there should be no transient errors of that sort).

I was considering wrapping all my processing/filtering code in a try catch and if an exception was raised then routing to an "error topic". Then I can study the message and modify it or fix my code as appropriate and then replay it on to master. If I let any exceptions propagate, the stream seems to get jammed and no more messages are picked up.

  • Is this approach considered best practice?
  • Is there a convenient Kafka streams way to handle this? I don't think there is a concept of a DLQ...
  • What are the alternative ways to stop Kafka jamming on a "bad message"?
  • What alternative error handling approaches are there?

For completeness here is my code (pseudo-ish):

class Document {     // Fields }  class AnalysedDocument {      Document document;     String rawValue;     Exception exception;     Analysis analysis;      // All being well     AnalysedDocument(Document document, Analysis analysis) {...}      // Analysis failed     AnalysedDocument(Document document, Exception exception) {...}      // Deserialisation failed     AnalysedDocument(String rawValue, Exception exception) {...} }  KStreamBuilder builder = new KStreamBuilder(); KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder     .stream(Serdes.String(), Serdes.String(), "master")     .mapValues(new ValueMapper<String, AnalysedDocument>() {          @Override          public AnalysedDocument apply(String rawValue) {              Document document;              try {                  // Deserialise                  document = ...              } catch (Exception e) {                  return new AnalysedDocument(rawValue, exception);              }              try {                  // Perform analysis                  Analysis analysis = ...                  return new AnalysedDocument(document, analysis);              } catch (Exception e) {                  return new AnalysedDocument(document, exception);              }          }     });  // Branch based on whether analysis mapping failed to produce errorStream and successStream errorStream.to(Serdes.String(), customPojoSerde(), "error"); successStream.to(Serdes.String(), customPojoSerde(), "analysed");  KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); 

Any help greatly appreciated.

like image 779
bm1729 Avatar asked Mar 08 '17 08:03

bm1729


People also ask

When should you not use Kafka streams?

As point 1 if having just a producer producing message we don't need Kafka Stream. If consumer messages from one Kafka cluster but publish to different Kafka cluster topics. In that case, you can even use Kafka Stream but have to use a separate Producer to publish messages to different clusters.

How do you handle large messages in Kafka?

Kafka Broker ConfigurationAn optional configuration property, “message. max. bytes“, can be used to allow all topics on a Broker to accept messages of greater than 1MB in size. And this holds the value of the largest record batch size allowed by Kafka after compression (if compression is enabled).


1 Answers

Right now, Kafka Streams offers only limited error handling capabilities. There is work in progress to simplify this. For now, your overall approach seems to be a good way to go.

One comment about handling de/serialization errors: handling those error manually, requires you to do de/serialization "manually". This means, you need to configure ByteArraySerdes for key and value for you input/output topic of your Streams app and add a map() that does the de/serialization (ie, KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType> -- or the other way round if you also want to catch serialization exceptions). Otherwise, you cannot try-catch deserialization exceptions.

With your current approach, you "only" validate that the given string represents a valid document -- but it could be the case, that the message itself is corrupted and cannot be converted into a String in the source operator in the first place. Thus, you don't actually cover deserialization exception with you code. However, if you are sure a deserialization exception can never happen, you approach would be sufficient, too.

Update

This issues is tackled via KIP-161 and will be included in the next release 1.0.0. It allows you to register an callback via parameter default.deserialization.exception.handler. The handler will be invoked every time a exception occurs during deserialization and allows you to return an DeserializationResponse (CONTINUE -> drop the record an move on, or FAIL that is the default).

Update 2

With KIP-210 (will be part of in Kafka 1.1) it's also possible to handle errors on the producer side, similar to the consumer part, by registering a ProductionExceptionHandler via config default.production.exception.handler that can return CONTINUE.

like image 189
Matthias J. Sax Avatar answered Sep 23 '22 00:09

Matthias J. Sax