Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka KStream-KTable join race condition

I have the following:

KTable<Integer, A> tableA = builder.table("A");
KStream<Integer, B> streamB = builder.stream("B");  

Messages in streamB need to be enriched with data from tableA.

Example data:

Topic A: (1, {name=john})
Topic B: (1, {type=create,...}), (1, {type=update,...}), (1, {type=update...})

In a perfect world, I would like to do

streamB.join(tableA, (b, a) -> { b.name = a.name; return b; })
       .selectKey((k,b) -> b.name)
       .to("C");

Unfortunately this does not work for me because my data is such that every time a message is written to topic A, a corresponding message is also written to topic B (the source is a single DB transaction). Now after this initial 'creation' transaction topic B will keep receiving more messages. Sometimes several events per seconds will show up on topic B but it is also possible to have consecutive events hours apart for a given key.

The reason the simple solution does not work is that the original 'creation' transaction causes a race condition: Topic A and B get their message almost simultaneously and if the B message reaches the 'join' part of the topology first (say a few ms before the A message gets there) the tableA will not yet contain a corresponding entry. At this point the event is lost. I can see this happening on topic C: some events show up, some don't (if I use a leftJoin, all events show up but some have null key which is equivalent to being lost). This is only a problem for the initial 'creation' transaction. After that every time an event arrives on topic B, the corresponding entry exists in tableA.

So my question is: how do you fix this?

My current solution is ugly. What I do is that I created a 'collection of B' and read topic B using

B.groupByKey()
 .aggregate(() -> new CollectionOfB(), (id, b, agg) -> agg.add(b));
 .join(tableA, ...);

Now we have a KTable-KTable join, which is not susceptible to this race condition. The reason I consider this 'ugly' is because after each join, I have to send a special message back to topic B that essentially says "remove the event(s) that I just processed from the collection". If this special message is not sent to topic B, the collection will keep growing and every event in the collection will be reported on every join.

Currently I'm investigating whether a window join would work (read both A and B into KStreams and use a windowed join). I'm not sure that this will work either because there is no upper bound on the size of the window. I want to say, "window starts 1 second 'before' and ends infinity seconds 'after'". Even if I can somehow make this work, I am a bit concerned with the space requirement of having an unbounded window.

Any suggestion would be greatly appreciated.

like image 635
David Breton Avatar asked Jan 29 '19 00:01

David Breton


People also ask

What is KStream and KTable in Kafka?

KStream, KTable and GlobalKTable. Kafka Streams provides two abstractions for Streams and Tables. KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update.

What is the output of KStream KTable join?

So the expected output would be 8000 records.

How do I combine two topics in Kafka?

Create the Kafka Streams topology A stream is opened up for each input topic. The input streams are then combined using the merge function, which creates a new stream that represents all of the events of its inputs.

How do I change my KTable key?

If you want to set a new key, you need to re-group the KTable: KTable newTable = table. groupBy(/*put select key function here*/) . aggregate(...);


1 Answers

Not sure what version you are using, but latest Kafka 2.1 improves the stream-table-join. Even before 2.1, the following holds:

  • stream-table join is base on event-time
  • Kafka Streams processes messages based on event-time, however, in offset-order (for two input streams, the stream with smaller record timestamps is processed first)
  • if you want to ensure that the table is updated first, the table update record should have a smaller timestamp than the stream record

Since 2.1:

  • to allow for some delay, you can configure max.task.idle.ms configuration to delay processing for the case that only one input topic has input data

The event-time processing order is implemented as best-effort in 2.0 and earlier versions what can lead to the race condition you describe. In 2.1, processing order is guaranteed and might only be violated if max.task.idle.ms hits.

For details, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization

like image 161
Matthias J. Sax Avatar answered Oct 20 '22 13:10

Matthias J. Sax