I have a stream that is a composite of other streams
final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
.leftJoin(
compositeFundTable,
(CompositeImc cimc, CompositeFund cf) -> {
CompositeInfo newCandidate = new CompositeInfo();
if (cimc != null) {
newCandidate.imcName = cimc.imcName;
newCandidate.imcID = cimc.imcID;
if (cf != null) {
newCandidate.investments = cf.investments;
}
}
return newCandidate;
})
.leftJoin(
compositeGeographyTable,
(CompositeInfo cinfo, CompositeGeography cg) -> {
if (cg != null) {
cinfo.regions = cg.regions;
}
return cinfo;
})
.leftJoin(
compositeSectorTable,
(CompositeInfo cinfo, CompositeSector cs) -> {
if (cs != null) {
cinfo.sectors = cs.sectors;
}
return cinfo;
})
.leftJoin(
compositeClusterTable,
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
})
.leftJoin(
compositeAlphaClusterTable,
(CompositeInfo cinfo, CompositeAlphaCluster cac) -> {
if (cac != null) {
cinfo.alphaClusters = cac.alphaClusters;
};
return cinfo;
},
Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
.withKeySerde(Serdes.Long())
.withValueSerde(compositeInfoSerde));
My issue relates to the left join between CompositeInfo and CustomCluster. CustomCluster looks like the following
KTable<Long, CustomCluster> compositeClusterTable = builder
.stream(
SUB_TOPIC_COMPOSITE_CLUSTER,
Consumed.with(Serdes.Long(), compositeClusterSerde))
.filter((k, v) -> v.clusters != null)
.groupByKey(Serialized.with(Serdes.Long(), compositeClusterSerde))
.reduce((aggValue, newValue) -> newValue);
A message in a custom cluster looks like
CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=MyName]]]
So I assign the HashMap clusters in this object to the clusters in CompositeInfo object joined on the compositeId.
What I am witnessing is that a CustomCluster message comes in for a given compositeId an dis processed correctly but then the old message containing the previous cluster (I am still investigating this) is processed again. Upon digging through the problem happens in kafka internal KTableKTableRightJoin
public void process(final K key, final Change<V1> change) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
if (key == null) {
return;
}
final R newValue;
R oldValue = null;
final V2 value2 = valueGetter.get(key);
if (value2 == null) {
return;
}
newValue = joiner.apply(change.newValue, value2);
if (sendOldValues) {
oldValue = joiner.apply(change.oldValue, value2);
}
context().forward(key, new Change<>(newValue, oldValue));
}
when the joine returns the first time, the newValue is updated correctly. But the code then goes to sendOldValues block and as soon as the joiner returns, the newValue is update gain but this time with old cluster value.
So here are my questions:
UPDATE: Another thing I found. If I move join up the chain of joins and remove others, the sendOldValues remains False. So if I have something like the following:
final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
.leftJoin(
compositeFundTable,
(CompositeImc cimc, CompositeFund cf) -> {
CompositeInfo newCandidate = new CompositeInfo();
if (cimc != null) {
newCandidate.imcName = cimc.imcName;
newCandidate.imcID = cimc.imcID;
if (cf != null) {
newCandidate.investments = cf.investments;
}
}
return newCandidate;
})
.leftJoin(
compositeClusterTable,
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
},
Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
.withKeySerde(Serdes.Long())
.withValueSerde(compositeInfoSerde));
This gives me the correct result. But I think that if I put any more chained joins after this they might display the same erroneous behavior.
I am not certain of anything at this point but I think my problem lies in chained leftjoin and the behavior of calculating oldValue. Has anyone else run into this issue?
UPDATE
After much digging through I realize that sendOldValues is internal to kafka and not the cause of issue I am experiencing. My issue is that the newValue changes when the ValueJoiner for oldValue returns and I dont know if its due to some pass by reference assignment to Java objects
This is what an incoming object looks like
CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=Sunil 2]]]
clusters is a HashSet<Cluster> clusters = new HashSet<Cluster>();
It is then joined to an object
CompositeInfo [compositeName=BUCKET_NM-280, compositeID=280, imcID=19651, regions=null, sectors=null, clusters=[]]
the clusters here is of the same type but in CompositeInfo class
When I join, I assign clusters of CustomCluster object to CompositeInfo object
(CompositeInfo cinfo, CustomCluster cc) -> {
if (cc != null && cc.clusters != null) {
cinfo.clusters = cc.clusters;
}
return cinfo;
}
After stumbling on the same issue myself, I would like to provide a detailed answer as well as a simplified example that helps illustrating the problem.
@Bean
public Function<KTable<String, String>,
Function<KTable<String, String>, Consumer<KTable<String, String>>>> processEvents() {
return firstnames ->
lastnames ->
titles -> firstnames
.mapValues(firstname -> new Salutation().withFirstname(firstname))
.join(lastnames, (salutation, lastname) -> salutation.withLastname(lastname))
.leftJoin(titles, (salutation, title) -> salutation.withTitle(title))
.toStream()
.foreach((key, salutation) -> log.info("{}: {}", key, salutation));
}
The example (which uses Spring Cloud Stream with the Kafka Streams binder) shows a common pattern where topics contents are merged into an accumulator object. In our case, a salutation (e.g. "Dear Ms. Smith") is accumulated/aggregated into a Salutation
object by joining topics representing the firstname, lastname and an (optional) title.
It is important to note that in this example, the Salutation
instance is a mutable object that is constructed step by step. When running such a piece of code, you will see that when changing a person's last name, the merge will always be "running behind". This means that if you publish a lastname event because Ms. Smith has just got married and is now called "Johnson", then Kafka Streams will again emit a Salutation
representing "Ms. Smith", despite the fact that she changed her last name. It is only when you publish yet another event for the same person on the lastnames topic (e.g. "Miller") that "Dear Ms. Johnson" will be logged.
The reason for this behavior is found in a piece of code located in KTableKTableInnerJoin.java
:
if (change.newValue != null) {
newValue = joiner.apply(change.newValue, valueRight);
}
if (sendOldValues && change.oldValue != null) {
oldValue = joiner.apply(change.oldValue, valueRight);
}
context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));
joiner
is a ValueJoiner
, which in our case can e.g. be (salutation, lastname) -> salutation.withLastname(lastname)
as shown above. The problem with this piece of code is that if you use an accumulation pattern with a mutable accumulator object (in our case an instance of Salutation
), which is (by design) reused for all the joins, then oldValue
and newValue
will be the same object. Moreover, since oldValue
is computed afterwards, it will contain the old last name, which explains why Spring Kafka is running behind.
Therefore, it is critical that the object returned by the ValueJoiner
is each time a fresh object which does not contain references to other mutable objects, which might be shared (and therefore mutated). The safest approach is therefore to have the ValueJoiner
return an immutable object.
I would not consider this as a bug of the library, since it has to compare the old and new state somehow, and since taking a snaphsot of a mutable object would require a deep copy. However, it would probably be worthwhile to have it mentioned in the documentation. Also, issuing a warning when oldValue == newValue
would at least make people aware of the problem. I will check whether such improvements could be incorporated.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With