I am not able to understand the concept of groupBy/groupById and windowing in kafka streaming. My goal is to aggregate stream data over some time period (e.g. 5 seconds). My streaming data looks something like:
{"value":0,"time":1533875665509}
{"value":10,"time":1533875667511}
{"value":8,"time":1533875669512}
The time is in milliseconds (epoch). Here my timestamp is in my message and not in key. And I want to average the value of 5 seconds window.
Here is code that I am trying but it seems I am unable to get it work
builder.<String, String>stream("my_topic")
.map((key, val) -> { TimeVal tv = TimeVal.fromJson(val); return new KeyValue<Long, Double>(tv.time, tv.value);})
.groupByKey(Serialized.with(Serdes.Long(), Serdes.Double()))
.windowedBy(TimeWindows.of(5000))
.count()
.toStream()
.foreach((key, val) -> System.out.println(key + " " + val));
This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like
[1533877059029@1533877055000/1533877060000] 1
[1533877061031@1533877060000/1533877065000] 1
[1533877063034@1533877060000/1533877065000] 1
[1533877065035@1533877065000/1533877070000] 1
[1533877067039@1533877065000/1533877070000] 1
This output does not make sense to me.
Related code:
public class MessageTimeExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
String str = (String)record.value();
TimeVal tv = TimeVal.fromJson(str);
return tv.time;
}
}
public class TimeVal
{
final public long time;
final public double value;
public TimeVal(long tm, double val) {
this.time = tm;
this.value = val;
}
public static TimeVal fromJson(String val) {
Gson gson = new GsonBuilder().create();
TimeVal tv = gson.fromJson(val, TimeVal.class);
return tv;
}
}
Questions:
Why do you need to pass serializer/deserializer to group by. Some of the overloads also take ValueStore, what is that? When grouped, how the data looks in the grouped stream?
How window stream is related to group stream?
The above, I was expecting to print in streaming way. That means buffer for every 5 seconds and then count and then print. It only prints once press Ctrl+c on command prompt i.e. it prints and then exits
Windowing. Windowing allows you to bucket stateful operations by time, without which your aggregations would endlessly accumulate. A window gives you a snapshot of an aggregate within a given timeframe, and can be set as hopping, tumbling, session, or sliding.
Every topic in Kafka is split into one or more partitions. Kafka partitions data for storing, transporting, and replicating it. Kafka Streams partitions data for processing it. In both cases, this partitioning enables elasticity, scalability, high performance, and fault tolerance.
A sliding window used for aggregating events. Sliding Windows are defined based on a record's timestamp, the window size based on the given maximum time difference (inclusive) between records in the same window, and the given window grace period.
It seems you don't have keys in your input data (correct me if this is wrong), and it further seems, that you want to do global aggregation?
In general, grouping is for splitting a stream into sub-streams. Those sub-streams are build by key (ie, one logical sub-stream per key). You set your timestamp as key in your code snippet an thus generate a sub-stream per timestamps. I assume this is not intended.
If you want to go a global aggregation, you will need to map all record to a single substream, ie, assign the same key to all records in groupBy()
. Note, that global aggregations don't scale as the aggregation must be computed by a single thread. Thus, this will only work for small workloads.
Windowing is applied to each generated sub-stream to build the windows, and the aggregation is computed per window. The windows are build base on the timestamp returned by the Timestamp
extractor. It seems you have an implementation that extracts the timestamp for the value for this purpose already.
This code does not print anything even though the topic is generating messages every two seconds. When I press Ctrl+C then it prints something like
By default, Kafka Streams uses some internal caching and the cache will be flushed on commit -- this happens every 30 seconds by default, or when you stop your application. You would need to disable caching to see result earlier (cf. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)
Why do you need to pass serializer/deserializer to group by.
Because data needs to be redistributed and this happens via a topic in Kafka. Note, that Kafka Streams is build for a distributed setup, with multiple instances of the same application running in parallel to scale out horizontally.
Btw: we might also be interesting in this blog post about the execution model of Kafka Streams: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With