Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KafkaStreams How to specify Serdes in stream aggregation?

I am working on a Kafka streams application and I have some trouble figuring out how to make an aggregation work.

I have a KStream bankTransactions where the keys are of type String and the values of type JsonNode so I configured my app's Serdes with

// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();

config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());

I want to aggregate the values in a KTable<String, Long> where the keys will be the same but the values will be Longs extracted from my Json.

So firstly I wrote:

KTable<String, Long> totalBalances = bankTransactions
        .groupByKey()
        .aggregate(
                () -> 0L,
                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
                Materialized.as("bank-total-balance")
        );

And I get the following error at runtime:

Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

I understand that Kafka is complaining because I'm trying to use the default Json serdes to serialize a Long. So reading from confluent's doc I tried this

KTable<String, Long> totalBalances = bankTransactions
        .groupByKey()
        .aggregate(
                () -> 0L,
                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
                Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
        );

But then I get an error at compilation:

Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>

I tried different way to write this code (e.g. using Serdes.long() instead of my longSerdes, trying to parametrize the types of Materialize and even trying to write my initializer and aggregator as function, Java 7 style) but I can't figure out what I am doing wrong.

So my question is simple: How to I properly specify the Serdes that aggregate should use when they are not the defaults Serdes?

like image 877
statox Avatar asked Mar 05 '23 02:03

statox


1 Answers

It seems like the correct syntax is the following:

KTable<String, Long> totalBalances = bankTransactions
        .groupByKey()
        .aggregate(
                () -> 0L,
                (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
                Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("bank-total-balances")
                        .withKeySerde(stringSerde)
                        .withValueSerde(longSerde)
        );

The three types after Materialize. are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.

Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.

like image 129
statox Avatar answered Mar 15 '23 11:03

statox