Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Kafka and Kafka Streams

In Spring Boot application I'm trying to configure Kafka Streams. With plain Kafka topics, everything is working fine, but I unable to get working Spring Kafka Streams.

This is my configuration:

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new StreamsConfig(props);
    }

    @Bean
    public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {

        KStream<String, String> stream = kStreamBuilder.stream("post.sent");

        stream.mapValues(post -> post.toString()).to("streamingTopic2");

        stream.print();

        return stream;
    }

    @Bean
    public NewTopic kafkaTopicTest() {
        return new NewTopic("streamingTopic2", 1, (short) 1);
    }

    @KafkaListener(topics = "streamingTopic2", containerFactory = "kafkaListenerContainerFactory")
    public void testListener(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) {

        String value = consumerRecord.value();

        System.out.println("VALUE: " + value);

        ack.acknowledge();
    }

}

I want to create a stream based on post.sent topic. To apply a simple transformation and to send the messages from this stream to test streamingTopic2 topic.

Right now when I send the message into post.sent topic I unable immediately to get it in "streamingTopic2" but after my application restart it start fails with the following error:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition streamingTopic2-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 100, 34, 58, 34, 53, 98, 56, 49, 53, 99, 97, 51, 52, 102, 97, 101, 102, 48, 52, 55, 97, 52, 48, 48, 100, 52, 50, 97, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, 83, 69, 78, 84, 34, 44, 34, 101, 120, 116, 101, 114, 110, 97, 108, 80, 111, 115, 116, 73, 100, 34, 58, 34, 48, 53, 54, 97, 57, 51, 49, 101, 45, 56, 97, 53, 100, 45, 52, 100, 52, 52, 45, 97, 101, 50, 48, 45, 53, 99, 51, 53, 52, 56, 57, 52, 98, 97, 53, 49, 34, 44, 34, 99, 104, 97, 116, 78]] from topic [streamingTopic2]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.lang.String` out of START_OBJECT token
 at [Source: (byte[])"{"id":"5b815ca34faef047a400d42a","status":"SENT","externalPostId":"056a931e-8a5d-4d44-ae20-5c354894ba51","chatName":.......":"[truncated 626 bytes]; line: 1, column: 1]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1342) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1138) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1092) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:63) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:10) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1611) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234) ~[jackson-databind-2.9.6.jar:2.9.6]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:248) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:224) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar:na]
    at org.apache.kafka.clien

To post.sent I send the following messages <String, Post> where the Post is my own complex type but I don't know right now how to translate it to <String, String> in kStream() in order to be able to consume it in testListener().

Please suggest how to make it work.

like image 332
alexanoid Avatar asked Jan 02 '23 23:01

alexanoid


1 Answers

Regarding your usage of

return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class)); in order to define the consumerFactory bean

Well, I can't say how you have Produced data into the topic, but the JSON parser is failing.

Cannot deserialize instance of `java.lang.String` out of START_OBJECT token
 at [Source: (byte[])"{"id":"5b815ca34faef047a400d42a","status":"SENT","externalPostId":"056a931e-8a5d-4d44-ae20-5c354894ba51","chatName":.......":"[truncated 626 bytes]; line: 1, column: 1]
...
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize

Based on Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105 ..., I would say you have at some point done a byte[] producer, rather than explicitly defined using StringSerializer or JSONSerializer during production.

You could get around your error by using new StringDeserializer() or even do no conversion at all with ByteArrayDeserializer in your consumerFactory, but then you'll still need to handle how to later parse that event into a object that you want to manipulate and extract fields from.

like image 176
OneCricketeer Avatar answered Jan 04 '23 13:01

OneCricketeer