I am working on a Kafka streams application and I have some trouble figuring out how to make an aggregation work.
I have a KStream bankTransactions
where the keys are of type String
and the values of type JsonNode
so I configured my app's Serdes with
// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());
I want to aggregate the values in a KTable<String, Long>
where the keys will be the same but the values will be Long
s extracted from my Json.
So firstly I wrote:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance")
);
And I get the following error at runtime:
Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
I understand that Kafka is complaining because I'm trying to use the default Json serdes to serialize a Long
. So reading from confluent's doc I tried this
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
);
But then I get an error at compilation:
Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>
I tried different way to write this code (e.g. using Serdes.long()
instead of my longSerdes
, trying to parametrize the types of Materialize
and even trying to write my initializer and aggregator as function, Java 7 style) but I can't figure out what I am doing wrong.
So my question is simple: How to I properly specify the Serdes that aggregate
should use when they are not the defaults Serdes?
It seems like the correct syntax is the following:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("bank-total-balances")
.withKeySerde(stringSerde)
.withValueSerde(longSerde)
);
The three types after Materialize.
are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.
Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With