I have the following:
KTable<Integer, A> tableA = builder.table("A");
KStream<Integer, B> streamB = builder.stream("B");
Messages in streamB need to be enriched with data from tableA.
Example data:
Topic A: (1, {name=john})
Topic B: (1, {type=create,...}), (1, {type=update,...}), (1, {type=update...})
In a perfect world, I would like to do
streamB.join(tableA, (b, a) -> { b.name = a.name; return b; })
.selectKey((k,b) -> b.name)
.to("C");
Unfortunately this does not work for me because my data is such that every time a message is written to topic A, a corresponding message is also written to topic B (the source is a single DB transaction). Now after this initial 'creation' transaction topic B will keep receiving more messages. Sometimes several events per seconds will show up on topic B but it is also possible to have consecutive events hours apart for a given key.
The reason the simple solution does not work is that the original 'creation' transaction causes a race condition: Topic A and B get their message almost simultaneously and if the B message reaches the 'join' part of the topology first (say a few ms before the A message gets there) the tableA will not yet contain a corresponding entry. At this point the event is lost. I can see this happening on topic C: some events show up, some don't (if I use a leftJoin, all events show up but some have null key which is equivalent to being lost). This is only a problem for the initial 'creation' transaction. After that every time an event arrives on topic B, the corresponding entry exists in tableA.
So my question is: how do you fix this?
My current solution is ugly. What I do is that I created a 'collection of B' and read topic B using
B.groupByKey()
.aggregate(() -> new CollectionOfB(), (id, b, agg) -> agg.add(b));
.join(tableA, ...);
Now we have a KTable-KTable join, which is not susceptible to this race condition. The reason I consider this 'ugly' is because after each join, I have to send a special message back to topic B that essentially says "remove the event(s) that I just processed from the collection". If this special message is not sent to topic B, the collection will keep growing and every event in the collection will be reported on every join.
Currently I'm investigating whether a window join would work (read both A and B into KStreams and use a windowed join). I'm not sure that this will work either because there is no upper bound on the size of the window. I want to say, "window starts 1 second 'before' and ends infinity seconds 'after'". Even if I can somehow make this work, I am a bit concerned with the space requirement of having an unbounded window.
Any suggestion would be greatly appreciated.
KStream, KTable and GlobalKTable. Kafka Streams provides two abstractions for Streams and Tables. KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update.
So the expected output would be 8000 records.
Create the Kafka Streams topology A stream is opened up for each input topic. The input streams are then combined using the merge function, which creates a new stream that represents all of the events of its inputs.
If you want to set a new key, you need to re-group the KTable: KTable newTable = table. groupBy(/*put select key function here*/) . aggregate(...);
Not sure what version you are using, but latest Kafka 2.1 improves the stream-table-join. Even before 2.1, the following holds:
Since 2.1:
max.task.idle.ms
configuration to delay processing for the case that only one input topic has input dataThe event-time processing order is implemented as best-effort in 2.0 and earlier versions what can lead to the race condition you describe. In 2.1, processing order is guaranteed and might only be violated if max.task.idle.ms
hits.
For details, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With