I'm having some issue with how the following topology behaves:
String topic = config.topic();
KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
.peek((k, v) -> L.info("Event:"+v.action))
// join the event with the according entry in the KTable and apply the state mutation
.leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
.peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
My Issue happens when i receive different events at the same time. As my state mutation is done by the leftJoin
and then written by the to
method. I can have the following occuring if event 1 and 2 are received at the same time with the same key:
event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic
Because of that, state Y doesn't have the changes from event1
, so I lost data.
Here's in terms of logs what I see (the Processing:...
part is logged from inside the value joiner):
Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1
Event1
can be considered as the creation event: it will create the entry in the KTable so it doesn't matter if the state is empty. Event2
though needs to apply it's changes to an existing state but it doesn't find any because the first state mutation still hasn't been written to the KTable (it's still hasn't been processed by the to
method)
Is there anyway to make sure that my leftJoin and my writes into the ktable are done atomically ?
Thanks
Update & current solution
Thanks to the response of @Matthias I was able to find a solution using a Transformer
.
Here's what the code looks like:
That's the transformer
public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {
private final String stateName;
private final ValueJoiner<V1, V2, V2> joiner;
private final boolean updateState;
private KeyValueStore<K, V2> state;
public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
this.stateName = stateName;
this.joiner = joiner;
this.updateState = updateState;
}
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
}
@Override
public KeyValue<K, V2> transform(K key, V1 value) {
V2 stateValue = this.state.get(key); // Get current state
V2 updatedValue = joiner.apply(value, stateValue); // Apply join
if (updateState) {
this.state.put(key, updatedValue); // write new state
}
return new KeyValue<>(key, updatedValue);
}
@Override
public KeyValue<K, V2> punctuate(long timestamp) {
return null;
}
@Override
public void close() {}
}
And here's the adapted topology:
String topic = config.topic();
String store = topic + "-store";
KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);
// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
// join the event with the according entry in the KTable and apply the state mutation
.transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
As we're using the KTable's KV StateStore and applying changes directly in it through the put
method events shoudl always pick up the updated state.
One thing i'm still wondering: what if I have a continuous high throughput of events.
Could there still be a race condition between the puts we do on the KTable's KV store and the writes that are done in the KTable's topic ?
A KTable shardes the data between all running Kafka Streams instances, while a GlobalKTable has a full copy of all data on each instance. The disadvantage of GlobalKTable is that it obviously needs more memory. The advantage is, that you can do a KStream-GlobalKTable join with a non-key attribute from the stream.
KStream-KStream Join This is a sliding window join, ie, all tuples that are "close" to each other with regard to time (ie, time difference up to window size) are joined. The result is a KStream. The table below shows the output (for each processed input record) for all three join variants.
Outer KStream-KStream JoinAn outer join will emit an output each time an event is processed in either stream. If the window state already contains an element with the same key in the other stream, it will apply the join method to both elements. If not, it will only apply the incoming element.
You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.
A KTable
is sharded into multiple physical stores and each store is only updated by a single thread. Thus, the scenario you describe cannot happen. If you have 2 records with the same timestamp that both update the same shard, they will be processed one after each other (in offset order). Thus, the second update will see the state of after the first update.
So maybe you just did describe your scenario not correctly?
Update
You cannot mutate the state when doing a join. Thus, the expectation that
event1 joins with state A => state A mutated to state X
is wrong. Independent of any processing order, when event1
joins with state A
, it will access state A
in read only mode and state A
will not be modified.
Thus, when event2
joins, it will see the same state as event1
. For stream-table join, the table state is only updated when new data is read from the table-input-topic.
If you want to have a shared state that is updated from both inputs, you would need to build a custom solution using transform()
:
builder.addStore(..., "store-name");
builder.stream("table-topic").transform(..., "store-name"); // will not emit anything downstream
KStream result = builder.stream("stream-topic").transform(..., "store-name");
This will create one store that is shared by both processors and both can read/write as they wish. Thus, for the table-input you can just update the state without sending anything downstream, while for the stream-input you can do the join, update the state, and send a result downstream.
Update 2
With regard to the solution, there will be no race condition between the updates the Transformer
applies to the state and records the Transformer
processes after the state update. This part will be executed in a single thread, and records will be processed in offset-order from the input topic. Thus, it's ensured that a state update will be available to later records.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With