Is it possible to get window final result in Kafka Streams by suppressing the intermediate results.
I can not achieve this goal. What is wrong with my code?
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
.toStream()
.print(Printed.toSysOut())
It leads to this error:
Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
Code / Error details: https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
The problem is a confusing asymmetry in the way that Streams automatically wraps explicit serdes during windowing, but does not automatically wrap the default serde. IMHO, this is an oversight that should be corrected, so I've filed: https://issues.apache.org/jira/browse/KAFKA-7806
As others have noted, the solution is to explicitly set the key serde upstream and not rely on the default key serde. You can either:
Set the serdes on the windowed aggregation with Materialized
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(as Nishu recommended)
(note that it is not necessary to name the count
operation, which has the side effect of making it queriable)
Or set the serdes further upstream, for example on the input:
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(as wardziniak recommended)
The choice is yours; I think in this case it's not too much different in either case. If you were doing a different aggregation than count
, you'd probably be setting the value serde via Materialized
anyway, so maybe the former would be a more uniform style.
I also noticed that your window definition doesn't have a grace period set. The window close time is defined as window end + grace period
, and the default is 24 hours, so you wouldn't see anything emitted from the suppression until 24 hours' worth of data have run through the app.
For your testing effort, I'd recommend trying:
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))
In production, you'll want to select a grace period that balances the amount of event lateness you expect in your stream with the amount of emission promptness you wish to see from the suppression.
One final note, I noticed in your gist that you haven't changed the default caching or commit interval. As a result, you'll notice that the count
operator itself will buffer updates for the default 30 seconds before passing them on to suppression. This is a good config for production so you don't create a bottleneck to your local disk or to the Kafka broker. But it might surprise you while you're testing.
Typically for tests (or interactively trying stuff out), I'll disable caching and set the commit interval short for maximum developer sanity:
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
Sorry about the serde oversight. I hope we get KAFKA-7806 addressed soon.
I hope this helps!
Problem is with the KeySerde. Since WindowedBy
operation results in Windowed<String>
type key but .suppress()
is using a default key type.
Hence you need to define KeySerde on the State store while calling the count method as given below:
builder.stream<String,Double>inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count(Materialized.<String, Long, WindowStore<Bytes,byte[]>>as("count").withCachingDisabled().withKeySerde(Serdes.String()))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream()
. print(Printed.toSysOut());
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With