Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

End-of-window outer join with KafkaStreams

I have a Kafka topic where I expect messages with two different key types: old and new. i.e. "1-new", "1-old", "2-new", "2-old". Keys are unique, but some might be missing.

Now using Kotlin and KafkaStreams API I can log those messages with have same key id from new and old.

    val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())

    val newStream = stream.filter({ key, _ -> isNew(key) })
            .map({key, value ->  KeyValue(key.replace(NEW_PREFIX, ""), value) })

    val oldStream = stream.filter({ key, _ -> isOld(key) })
            .map({key, value ->  KeyValue(key.replace(OLD_PREFIX, ""), value) })

    val joined = newStream.join(oldStream,
            { value1, value2 -> "$value1&$value2" }, windows)

    joined.foreach({ key, value ->
        log.info { "JOINED $key : $value" }
    })

Now I want to know new/old keys which are missing in time window for some reason. Is it possible to achieve with KafkaStreams API?

In my case when key "1-old" is received and "1-new" is not within 2 minutes only in this case I want to report id 1 as suspicious.

like image 965
Nikolay Kuznetsov Avatar asked Jan 02 '23 23:01

Nikolay Kuznetsov


2 Answers

The DSL might not give you what you want. However, you can use Processor API. Having say this, the leftJoin can actually be used to do the "heavy lifting". Thus, after the leftJoin you can use .transform(...) with an attached state to "clean up" the data further.

For each old&null record you receive, put it into the store. If you receive a later old&new you can remove it from the store. Furthermore, you register a punctuation and on each punctuation call, you scan the store for entries that are "old enough" so you are sure no later old&new join result will be produced. For those entries, you emit old&null and remove from them from the store.

As an alternative, you can also omit the join, and do everything in a single transform() with state. For this, you would need to KStream#merge() old and new stream and call transform() on the merged stream.

Note: instead of registering a punctuation, you can also put the "scan logic" into the transform and execute it each time you process a record.

like image 186
Matthias J. Sax Avatar answered Jan 05 '23 12:01

Matthias J. Sax


If I understand your question correctly you only want to report id's as suspicious when there is an "old" without a corresponding "new" within the 2-minute window.

If that's the case you'll want to use a left join :

val leftJoined = oldStream.leftJoin(newStream,...).filter(condition where value expected from "new" stream is null);

HTH

like image 25
bbejeck Avatar answered Jan 05 '23 14:01

bbejeck