Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Exception when processing data during Kafka stream process

I am working on Kafka streams using the below code. I check a filter condition from a JSON obj for the condition if "UserID":"1". Please refer the code below

builder.<String,String>stream(Serdes.String(), Serdes.String(), topic)
                   .filter(new Predicate <String, String>() {

               String userIDCheck = null;

               @Override
            public boolean test(String key, String value) {

                   try {
                       JSONObject jsonObj = new JSONObject(value);

                       userIDCheck = jsonObj.get("UserID").toString();
                       System.out.println("userIDCheck: " + userIDCheck);                          
                   } catch (JSONException e) {
                       // TODO Auto-generated catch block
                       e.printStackTrace();
                   }

                   return userIDCheck.equals("1");
               }
            })
           .to(streamouttopic);

value : {"UserID":"1","Address":"XXX","AccountNo":"989","UserName":"Stella","AccountType":"YYY"}

I get the below error:

    Exception in thread "SampleStreamProducer-4eecc3ab-858c-44a4-9b8c-5ece2b4ab21a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=testtopic1, partition=0, offset=270
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
    ... 3 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:89)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)

Value and condition are fine from above stream code, i couldn't get why it is giving this exception during when executing steam code.

like image 212
Stella Avatar asked Dec 23 '22 07:12

Stella


1 Answers

The reported issue should only applies to Kafka 2.0 and older. Since 2.1.0 release, Kafka Streams supports "serde push down" and the to() operator should inherit the correct serdes from upstream (cf https://issues.apache.org/jira/browse/KAFKA-7456).

For Kafka 2.0 and older, you have to specify the correct Serdes for the to() operation, explicitly. Otherwise, it uses the default Serdes from the StreamsConfig that is ByteArraySerde (because the semantics or serde overwrite is a per operator "drop-in overwrite") -- and String cannot be cast to byte[].

You need to do:

.to(streamoutputtopic, Produced.with(Serdes.String(), Serdes.String()));

For even older versions (pre 1.0) that don't use Produced parameter yet, the code would be:

.to(Serdes.String(), Serdes.String(), streamoutputtopic);
like image 52
Matthias J. Sax Avatar answered May 17 '23 09:05

Matthias J. Sax