Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka streams error : SerializationException: Size of data received by LongDeserializer is not 8

I am trying Kafka Streams. Writing a simple application where I am counting duplicate messages.

Message:

2019-02-27-11:16:56 :: session:prod-111656 :: Msg => Hello World: 2491
2019-02-27-11:16:56 :: session:prod-111656 :: Msg => Hello World: 2492

etc.

I am trying to split such messages by session:prod-xxxx. Use it as key. And session:prod-xxxx+Hello World: xxxx use it as value. Then group by key, and see which messages got duplicated in each session.

Here's the code:

KStream<String, String> textLines = builder.stream("RegularProducer");
KTable<String, Long> ktable = textLines.map(
    (String key, String value) -> {
        try {
            String[] parts = value.split("::");
            String sessionId = parts[1];
            String message = ((parts[2]).split("=>"))[1];
            message = sessionId+":"+message;
            return new KeyValue<String,String>(sessionId.trim().toLowerCase(), message.trim().toLowerCase());
        } catch (Exception e) {
            return new KeyValue<String,String>("Invalid-Message".trim().toLowerCase(), "Invalid Message".trim().toLowerCase());
        }
    })
    .groupBy((key,value) -> value)
    .count().filter(
            (String key, Long value) -> {
                return value > 1;
            }
    );

ktable.toStream().to("RegularProducerDuplicates", 
Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
topology.describe();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

KTable topic RegularProducerDuplicates gets produced. But when I use console-consumer to view it, it crashes with an error. Then I use --skip-message-on-error flag on console-consumer. Now I see thousands of lines like these

session:prod-111656 : hello world: 994  [2019-02-28 16:25:18,081] ERROR Error processing message, skipping this message:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

Can anyone help me what's going wrong here?

like image 798
Shades88 Avatar asked Jan 01 '23 05:01

Shades88


2 Answers

Your Kafka Streams application is ok and works properly.

The bug is in kafka-console-consumer (kafka.tools.ConsoleConsumer is class that implements logic for script).

It doesn't properly handle null during deserialization. When it gets null as value or key for a message it sets default value (Array of bytes that representing null String). If you check source code you can find following function

def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) {
  val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
  val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString.
    getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
  output.write(convertedBytes)
}

How you can see when it gets sourceBytes that is null (sourceBytes==null) for deserialization it set default value for that:

val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))

In your case it is "null".getBytes(StandardCharsets.UTF_8). Then, there is a try of deserialization with org.apache.kafka.common.serialization.LongDeserializer (your value deserializer). LongDeserializer checks at very beginning the size of the Array of bytes. Now it is 4 (byte representation of null) and an exception is thrown.

If you for example use StringDeserializer, it will not deserialize it properly but at least it won't throw an exception, because it doesn't check the length of array of bytes.

Long story short: ConsoleConsumer's formatter, that is responsible for printing, for pretty printing set some default value, that can't be handled by some Deserializers (LongDeserializer, IntegerDeserializer)

Regarding, why your application produce null values for some keys:

The KTable:filter has different semantic than the KStream::filter. According to javadoc for KTable:

for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.

For your filter, when count <= 1 it passes null value for the key.

like image 197
Bartosz Wardziński Avatar answered Jan 13 '23 20:01

Bartosz Wardziński


The Deserializer used for values might not be for String and will be for Long. While creating consumer in cli do specify it. Ex-

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic name \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --skip-message-on-error \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Here check the last 2 lines while creating consumer, take care of the type of your (Key,Values) In my case both were strings, if values had been of long type, use last line as: --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

like image 25
sangeet singh Avatar answered Jan 13 '23 19:01

sangeet singh