Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

The method withValueSerde() in the type Materialized<> is not applicable

I am grouping a stream by key and and trying to aggregate the values by the grouped key. I am following the streams-developer-guide

I am getting an error on withValueSerde. It says:

The method withValueSerde(Serde<Object>) in the type Materialized<Object,Object,StateStore> is not applicable for the arguments (Serde<Long>)

Code:

KStream<String, String> inputStream = builder.stream("input_topic");
KStream<String, Integer> transformedStream = inputStream.map(
        (key, value) ->  KeyValue.pair(getKey(value), getValue(value)));

KGroupedStream<String, Integer> groupedStream = transformedStream.groupByKey();

KTable<String, Long> aggregatedStream = groupedStream.aggregate(() -> 0L,
        (aggKey, newValue, aggValue) -> aggValue + newValue,
        Materialized.as("aggregated-stream-store").withValueSerde(Serdes.Long()));
like image 723
el323 Avatar asked Dec 23 '22 06:12

el323


1 Answers

You need to specify the generic types. Java cannot infer them automatically (If you look at the error message, it just says Materialized<Object,Object,StateStore> indicating the unknown types):

Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("aggregated-stream-store")
    .withValueSerde(Serdes.Long())
like image 164
Matthias J. Sax Avatar answered May 17 '23 10:05

Matthias J. Sax