Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KStream batch process windows

I want to batch messages with KStream interface.

I have a Stream with Keys/values I tried to collect them in a tumbling window and then I wanted to process the complete window at once.

builder.stream(longSerde, updateEventSerde, CONSUME_TOPIC)
                .aggregateByKey(
                        HashMap::new,
                        (aggKey, value, aggregate) -> {
                            aggregate.put(value.getUuid, value);
                            return aggregate;
                        },
                        TimeWindows.of("intentWindow", 100),
                        longSerde, mapSerde)
                .foreach((wk, values) -> {

The thing is foreach gets called on each update to the KTable. I would like to process the whole window once it is complete. As in collect Data from 100 ms and then process at once. In for each.

16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 294
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 295
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 296
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 297
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 298
16:** - windows from 2016-08-23T10:56:26 to 2016-08-23T10:56:27, key 2016-07-21T14:38:16.288, value count: 299
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 1
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 2
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 3
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 4
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 5
16:** - windows from 2016-08-23T10:56:27 to 2016-08-23T10:56:28, key 2016-07-21T14:38:16.288, value count: 6

at some point the new window starts with 1 entry in the map. So I don't even know when the window is full.

any hints to to batch process in kafka streams

like image 543
samst Avatar asked Aug 23 '16 14:08

samst


People also ask

Can Kafka do batch processing?

Apache Kafka is a robust messaging queue that enables the transfer of high volumes of messages from one end-point to other. Creating a Kafka Batch Process allows for processing multiple messages with ease.

What is a KStream?

KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an independent entity/event in the real world. For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2> in the stream.

What is Kafka streaming windowing?

Windowing. Windowing allows you to bucket stateful operations by time, without which your aggregations would endlessly accumulate. A window gives you a snapshot of an aggregate within a given timeframe, and can be set as hopping, tumbling, session, or sliding.

What is KStream KTable?

KStream, KTable and GlobalKTable. Kafka Streams provides two abstractions for Streams and Tables. KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update.


3 Answers

My actual tasks is to push updates from the stream to redis but I don't want to read / update / write individiually even though redis is fast. My solution for now is to use KStream.process() supply a processor which adds to a queue on process and actually process the queue in punctuate.

public class BatchedProcessor extends AbstractProcessor{

...
BatchedProcessor(Writer writer, long schedulePeriodic)

@Override
public void init(ProcessorContext context) {
    super.init(context);
    context.schedule(schedulePeriodic);
}

@Override
public void punctuate(long timestamp) {
    super.punctuate(timestamp);
    writer.processQueue();
    context().commit();
}

@Override
public void process(Long aLong, IntentUpdateEvent intentUpdateEvent) {
    writer.addToQueue(intentUpdateEvent);
}

I still have to test but it solves the problem I had. One could easily write such a processor in a very generic way. The API is very neat and clean but a processBatched((List batchedMessaages)-> ..., timeInterval OR countInterval) that just uses punctuate to process the batch and commits at that point and collects KeyValues in a Store might be a useful addition.

But maybe it was intended to solve this with a Processor and keep the API purely in the one message at a time low latency focus.

like image 131
samst Avatar answered Nov 15 '22 19:11

samst


Right now (as of Kafka 0.10.0.0 / 0.10.0.1): The windowing behavior you are describing is "working as expected". That is, if you are getting 1,000 incoming messages, you will (currently) always see 1,000 updates going downstream with the latest versions of Kafka / Kafka Streams.

Looking ahead: The Kafka community is working on new features to make this update-rate behavior more flexible (e.g. to allow for what you described above as your desired behavior). See KIP-63: Unify store and downstream caching in streams for more details.

like image 43
Michael G. Noll Avatar answered Nov 15 '22 17:11

Michael G. Noll


====== Update ======

On further testing, this does not work. The correct approach is to use a processor as outlined by @friedrich-nietzsche. I am down-voting my own answer.... grrrr.

===================

I am still wrestling with this API (but I love it, so it's time well spent :)), and I am not sure what you're trying to accomplish downstream from where your code sample ended, but it looks similar to what I got working. High level is:

Object read from source. It represents a key and 1:∞ number of events, and I want to publish the total number of events per key every 5 seconds ( or TP5s, transactions per 5 seconds ). The beginning of the code looks the same, but I use:

  1. KStreamBuilder.stream
  2. reduceByKey
  3. to a window(5000)
  4. to a new stream which gets the accumulated value for each key every 5 secs.
  5. map that stream to a new KeyValue per key
  6. to the sink topic.

In my case, each window period, I can reduce all events to one event per key, so this works. If you want to retain all the individual events per window, I assume that could use reduce to map each instance to a collection of instances (possibly with the same key, or you might need a new key) and at the end of each window period, the downstream stream will get a bunch of collections of your events (or maybe just one collection of all the events), all in one go. It looks like this, sanitized and Java 7-ish:

    builder.stream(STRING_SERDE, EVENT_SERDE, SOURCE_TOPICS)
        .reduceByKey(eventReducer, TimeWindows.of("EventMeterAccumulator", 5000), STRING_SERDE, EVENT_SERDE)            
        .toStream()
        .map(new KeyValueMapper<Windowed<String>, Event, KeyValue<String,Event>>() {
            public KeyValue<String, Event> apply(final Windowed<String> key, final Event finalEvent) {
                return new KeyValue<String, Event>(key.key(), new Event(key.window().end(), finalEvent.getCount());
            }
    }).to(STRING_SERDE, EVENT_SERDE, SINK_TOPIC);
like image 42
Nicholas Avatar answered Nov 15 '22 18:11

Nicholas