Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KStream-KTable join writing to the KTable: How to sync the join with the ktable write?

I'm having some issue with how the following topology behaves:

String topic = config.topic();

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    .peek((k, v) -> L.info("Event:"+v.action))
    // join the event with the according entry in the KTable and apply the state mutation
    .leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
    .peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

My Issue happens when i receive different events at the same time. As my state mutation is done by the leftJoin and then written by the to method. I can have the following occuring if event 1 and 2 are received at the same time with the same key:

event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic

Because of that, state Y doesn't have the changes from event1, so I lost data.

Here's in terms of logs what I see (the Processing:... part is logged from inside the value joiner):

Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1

Event1 can be considered as the creation event: it will create the entry in the KTable so it doesn't matter if the state is empty. Event2 though needs to apply it's changes to an existing state but it doesn't find any because the first state mutation still hasn't been written to the KTable (it's still hasn't been processed by the to method)

Is there anyway to make sure that my leftJoin and my writes into the ktable are done atomically ?

Thanks

Update & current solution

Thanks to the response of @Matthias I was able to find a solution using a Transformer.

Here's what the code looks like:

That's the transformer

public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {

    private final String                    stateName;
    private final ValueJoiner<V1, V2, V2>   joiner;
    private final boolean                   updateState;

    private KeyValueStore<K, V2>            state;

    public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
        this.stateName = stateName;
        this.joiner = joiner;
        this.updateState = updateState;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
    }

    @Override
    public KeyValue<K, V2> transform(K key, V1 value) {
        V2 stateValue = this.state.get(key); // Get current state
        V2 updatedValue = joiner.apply(value, stateValue); // Apply join
        if (updateState) {
            this.state.put(key, updatedValue); // write new state
        }
        return new KeyValue<>(key, updatedValue);
    }

    @Override
    public KeyValue<K, V2> punctuate(long timestamp) {
        return null;
    }

    @Override
    public void close() {}
}

And here's the adapted topology:

String topic = config.topic();
String store = topic + "-store";

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    // join the event with the according entry in the KTable and apply the state mutation
    .transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

As we're using the KTable's KV StateStore and applying changes directly in it through the put method events shoudl always pick up the updated state. One thing i'm still wondering: what if I have a continuous high throughput of events.

Could there still be a race condition between the puts we do on the KTable's KV store and the writes that are done in the KTable's topic ?

like image 257
Crystark Avatar asked Sep 14 '17 13:09

Crystark


People also ask

What is the difference between KTable and KStream?

A KTable shardes the data between all running Kafka Streams instances, while a GlobalKTable has a full copy of all data on each instance. The disadvantage of GlobalKTable is that it obviously needs more memory. The advantage is, that you can do a KStream-GlobalKTable join with a non-key attribute from the stream.

What is the output of KStream KTable join?

KStream-KStream Join This is a sliding window join, ie, all tuples that are "close" to each other with regard to time (ie, time difference up to window size) are joined. The result is a KStream. The table below shows the output (for each processed input record) for all three join variants.

How does KStream join work?

Outer KStream-KStream JoinAn outer join will emit an output each time an event is processed in either stream. If the window state already contains an element with the same key in the other stream, it will apply the join method to both elements. If not, it will only apply the incoming element.

How do you make KTable KStream?

You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.


1 Answers

A KTable is sharded into multiple physical stores and each store is only updated by a single thread. Thus, the scenario you describe cannot happen. If you have 2 records with the same timestamp that both update the same shard, they will be processed one after each other (in offset order). Thus, the second update will see the state of after the first update.

So maybe you just did describe your scenario not correctly?

Update

You cannot mutate the state when doing a join. Thus, the expectation that

event1 joins with state A => state A mutated to state X

is wrong. Independent of any processing order, when event1 joins with state A, it will access state A in read only mode and state A will not be modified.

Thus, when event2 joins, it will see the same state as event1. For stream-table join, the table state is only updated when new data is read from the table-input-topic.

If you want to have a shared state that is updated from both inputs, you would need to build a custom solution using transform():

builder.addStore(..., "store-name");
builder.stream("table-topic").transform(..., "store-name"); // will not emit anything downstream
KStream result = builder.stream("stream-topic").transform(..., "store-name");

This will create one store that is shared by both processors and both can read/write as they wish. Thus, for the table-input you can just update the state without sending anything downstream, while for the stream-input you can do the join, update the state, and send a result downstream.

Update 2

With regard to the solution, there will be no race condition between the updates the Transformer applies to the state and records the Transformer processes after the state update. This part will be executed in a single thread, and records will be processed in offset-order from the input topic. Thus, it's ensured that a state update will be available to later records.

like image 165
Matthias J. Sax Avatar answered Dec 27 '22 18:12

Matthias J. Sax