Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Stream count on time window not reporting zero values

I'm using a Kafka streams to calculate how many events occurred in last 3 minutes using a hopping time window:

public class ViewCountAggregator {

    void buildStream(KStreamBuilder builder) {      

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Long> longSerde = Serdes.Long();

        KStream<String, String> views = builder.stream(stringSerde, stringSerde, "streams-view-count-input");
        KStream<String, Long> viewCount = views
            .groupBy((key, value) -> value)
            .count(TimeWindows.of(TimeUnit.MINUTES.toMillis(3)).advanceBy(TimeUnit.MINUTES.toMillis(1)))
            .toStream()
            .map((key, value) -> new KeyValue<>(key.key(), value));

        viewCount.to(stringSerde, longSerde, "streams-view-count-output");        
    }

    public static void main(String[] args) throws Exception {                   
        // some not so important initialization code
        ...  
    }

}

When running a consumer and pushing some messages to an input topic it receives following updates as the time passes:

single  1
single  1
single  1
five    1
five    4
five    5
five    4
five    1

Which is almost correct, but it never receives updates for:

single  0
five    0

Without it my consumer that updates a counter will never set it back to zero when there are no events for a longer period of time. I'm expecting consumed messages to look like this:

single  1
single  1
single  1
single  0
five    1
five    4
five    5
five    4
five    1
five    0

Is there some configuration option / argument I'm missing that would help me achieving such behavior?

like image 948
Marian Galik Avatar asked Jul 14 '17 07:07

Marian Galik


1 Answers

Which is almost correct, but it never receives updates for:

First, the computed output is correct.

Second, why is it correct:

If you apply a windowed aggregate, only those windows that do have actual content are created (all other systems I am familiar with, would produce the same output). Thus, if for some key, there is not data for a time period longer than the window size, there is no window instantiated and thus, there is also no count at all.

The reason to not instantiate windows if there is no content is quite simple: the processor cannot know all keys. In your example, you have two keys, but maybe later on there might come up a third key. Would you expect to get <thirdKey,0> from the beginning on? Also, as data streams are infinite in nature, keys might go away and never reappear. If you remember all seen keys, and emit <key,0> if there is no data for a key that disappeared, would you emit <key,0> for ever?

I don't want to say that your expected result/semantics does not make sense. It's just a very specific use case of yours and not applicable in general. Hence, stream processors don't implement it.

Third: What can you do?

There are multiple options:

  1. Your consumer can keep track of what keys it did see, and using the embedded record timestamps figures out if a key is "missing" and then set the counter to zero for this key (for this, it might also help to remove the map step and preserve the Windowed<K> type for the key, such that the consumer get the information to which window a record belongs)
  2. Add a stateful #transform() step in your Stream application that does the same thing as described in (1). For this, it might be helpful to register a punctuation call back.

Approach (2) should make it easier to track keys, as you can attach a state store to your transform step and thus don't need to deal with state (and failure/recovery) in your downstream consumer.

However, the tricky part for both approaches is still to decide when a key is missing, i.e., how long do you wait until you produce <key,0>. Note, that data might be late arriving (aka out-of-order) and even if you did emit <key,0> a late arriving record might producer a <key,1> message after your code did emit a <key,0> record. But maybe this is not really an issue for your case as it seems you use the latest window only anyways.

Last but not least one more comment: It seems that you are using only the latest count and that newer windows overwrite older windows in your downstream consumer. Thus, it might be worth to explore "Interactive Queries" to tap into the state of your count operator directly instead of consumer the topic and updating some other state. This might allow you to redesign and simplify you downstream application significantly. Check out the docs and a very good blog post about Interactive Queries for more details.

like image 95
Matthias J. Sax Avatar answered Sep 22 '22 23:09

Matthias J. Sax