I have a Kafka topic where I expect messages with two different key types: old and new.
i.e. "1-new", "1-old", "2-new", "2-old". Keys are unique, but some might be missing.
Now using Kotlin and KafkaStreams API I can log those messages with have same key id from new and old.
    val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())
    val newStream = stream.filter({ key, _ -> isNew(key) })
            .map({key, value ->  KeyValue(key.replace(NEW_PREFIX, ""), value) })
    val oldStream = stream.filter({ key, _ -> isOld(key) })
            .map({key, value ->  KeyValue(key.replace(OLD_PREFIX, ""), value) })
    val joined = newStream.join(oldStream,
            { value1, value2 -> "$value1&$value2" }, windows)
    joined.foreach({ key, value ->
        log.info { "JOINED $key : $value" }
    })
Now I want to know new/old keys which are missing in time window for some reason. Is it possible to achieve with KafkaStreams API?
In my case when key "1-old" is received and "1-new" is not within 2 minutes only in this case I want to report id 1 as suspicious. 
The DSL might not give you what you want. However, you can use Processor API. Having say this, the leftJoin can actually be used to do the "heavy lifting". Thus, after the leftJoin you can use .transform(...) with an attached state to "clean up" the data further.
For each old&null record you receive, put it into the store. If you receive a later old&new you can remove it from the store. Furthermore, you register a punctuation and on each punctuation call, you scan the store for entries that are "old enough" so you are sure no later old&new join result will be produced. For those entries, you emit old&null and remove from them from the store.
As an alternative, you can also omit the join, and do everything in a single transform() with state. For this, you would need to KStream#merge() old and new stream and call transform() on the merged stream.
Note: instead of registering a punctuation, you can also put the "scan logic" into the transform and execute it each time you process a record.
If I understand your question correctly you only want to report id's as suspicious when there is an "old" without a corresponding "new" within the 2-minute window.
If that's the case you'll want to use a left join :
val leftJoined = oldStream.leftJoin(newStream,...).filter(condition where value expected from "new" stream is null);
HTH
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With