I am working on Kafka streams using the below code. I check a filter condition from a JSON obj for the condition if "UserID":"1"
. Please refer the code below
builder.<String,String>stream(Serdes.String(), Serdes.String(), topic)
.filter(new Predicate <String, String>() {
String userIDCheck = null;
@Override
public boolean test(String key, String value) {
try {
JSONObject jsonObj = new JSONObject(value);
userIDCheck = jsonObj.get("UserID").toString();
System.out.println("userIDCheck: " + userIDCheck);
} catch (JSONException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return userIDCheck.equals("1");
}
})
.to(streamouttopic);
value : {"UserID":"1","Address":"XXX","AccountNo":"989","UserName":"Stella","AccountType":"YYY"}
I get the below error:
Exception in thread "SampleStreamProducer-4eecc3ab-858c-44a4-9b8c-5ece2b4ab21a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=testtopic1, partition=0, offset=270
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
... 3 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:89)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
Value and condition are fine from above stream code, i couldn't get why it is giving this exception during when executing steam code.
The reported issue should only applies to Kafka 2.0 and older. Since 2.1.0 release, Kafka Streams supports "serde push down" and the to()
operator should inherit the correct serdes from upstream (cf https://issues.apache.org/jira/browse/KAFKA-7456).
For Kafka 2.0 and older, you have to specify the correct Serdes for the to()
operation, explicitly. Otherwise, it uses the default Serdes from the StreamsConfig
that is ByteArraySerde
(because the semantics or serde overwrite is a per operator "drop-in overwrite") -- and String
cannot be cast to byte[]
.
You need to do:
.to(streamoutputtopic, Produced.with(Serdes.String(), Serdes.String()));
For even older versions (pre 1.0) that don't use Produced
parameter yet, the code would be:
.to(Serdes.String(), Serdes.String(), streamoutputtopic);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With