Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Stream Chained LeftJoin - Processing previous old message again after the new one

I have a stream that is a composite of other streams

final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
    .leftJoin(
        compositeFundTable, 
        (CompositeImc cimc, CompositeFund cf) -> {
            CompositeInfo newCandidate = new CompositeInfo();
            if (cimc != null) {
                newCandidate.imcName = cimc.imcName;
                newCandidate.imcID = cimc.imcID;                                                                    
                if (cf != null) {
                    newCandidate.investments = cf.investments;
                }
            }
            return newCandidate;
        })
    .leftJoin(
        compositeGeographyTable, 
        (CompositeInfo cinfo, CompositeGeography cg) -> {
            if (cg != null) {
                cinfo.regions = cg.regions;
            }
            return cinfo;
        })
    .leftJoin(
        compositeSectorTable, 
        (CompositeInfo cinfo, CompositeSector cs) -> {
            if (cs != null) {
                cinfo.sectors = cs.sectors;
            }
            return cinfo;
        })
    .leftJoin(
        compositeClusterTable, 
        (CompositeInfo cinfo, CustomCluster cc) -> {
            if (cc != null && cc.clusters != null) {
                cinfo.clusters = cc.clusters;
            }
            return cinfo;
        })
    .leftJoin(
        compositeAlphaClusterTable, 
        (CompositeInfo cinfo, CompositeAlphaCluster cac) -> {
            if (cac != null) {
                cinfo.alphaClusters = cac.alphaClusters;
            };
            return cinfo;
        },
        Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
            .withKeySerde(Serdes.Long())
            .withValueSerde(compositeInfoSerde));

My issue relates to the left join between CompositeInfo and CustomCluster. CustomCluster looks like the following

KTable<Long, CustomCluster> compositeClusterTable = builder
    .stream(
        SUB_TOPIC_COMPOSITE_CLUSTER,
        Consumed.with(Serdes.Long(), compositeClusterSerde))
    .filter((k, v) -> v.clusters != null)
    .groupByKey(Serialized.with(Serdes.Long(), compositeClusterSerde))
    .reduce((aggValue, newValue) -> newValue);

A message in a custom cluster looks like

CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=MyName]]]

So I assign the HashMap clusters in this object to the clusters in CompositeInfo object joined on the compositeId.

What I am witnessing is that a CustomCluster message comes in for a given compositeId an dis processed correctly but then the old message containing the previous cluster (I am still investigating this) is processed again. Upon digging through the problem happens in kafka internal KTableKTableRightJoin

public void process(final K key, final Change<V1> change) {
    // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
    if (key == null) {
        return;
    }

    final R newValue;
    R oldValue = null;

    final V2 value2 = valueGetter.get(key);
    if (value2 == null) {
        return;
    }

    newValue = joiner.apply(change.newValue, value2);

    if (sendOldValues) {
        oldValue = joiner.apply(change.oldValue, value2);
    }

    context().forward(key, new Change<>(newValue, oldValue));
}

when the joine returns the first time, the newValue is updated correctly. But the code then goes to sendOldValues block and as soon as the joiner returns, the newValue is update gain but this time with old cluster value.

So here are my questions:

  1. Why is newValues getting updated when the joiner is called the second time with oldValue
  2. Is there a way to turn sendOldValues off
  3. Does my chained left-joins would have anything to do with it. I know previous versions of kafka had a bug with chaining. But now I am on 1.0

UPDATE: Another thing I found. If I move join up the chain of joins and remove others, the sendOldValues remains False. So if I have something like the following:

final KTable<Long, CompositeInfo> compositeInfoTable = compositeImcTable
    .leftJoin(
        compositeFundTable, 
        (CompositeImc cimc, CompositeFund cf) -> {
            CompositeInfo newCandidate = new CompositeInfo();
            if (cimc != null) {
                newCandidate.imcName = cimc.imcName;
                newCandidate.imcID = cimc.imcID;
                if (cf != null) {
                    newCandidate.investments = cf.investments;
                }
            }   
            return newCandidate;
        })
    .leftJoin(
        compositeClusterTable, 
        (CompositeInfo cinfo, CustomCluster cc) -> {
            if (cc != null && cc.clusters != null) {
                cinfo.clusters = cc.clusters;
            }
            return cinfo;
        },
        Materialized.<Long, CompositeInfo, KeyValueStore<Bytes, byte[]>>as(this.storeName)
          .withKeySerde(Serdes.Long())
          .withValueSerde(compositeInfoSerde));

This gives me the correct result. But I think that if I put any more chained joins after this they might display the same erroneous behavior.

I am not certain of anything at this point but I think my problem lies in chained leftjoin and the behavior of calculating oldValue. Has anyone else run into this issue?

UPDATE

After much digging through I realize that sendOldValues is internal to kafka and not the cause of issue I am experiencing. My issue is that the newValue changes when the ValueJoiner for oldValue returns and I dont know if its due to some pass by reference assignment to Java objects

This is what an incoming object looks like

CustomCluster [clusterId=null, clusterName=null, compositeId=280, operation=null, clusters=[Cluster [clusterId=6041, clusterName=Sunil 2]]]

clusters is a HashSet<Cluster> clusters = new HashSet<Cluster>();

It is then joined to an object

CompositeInfo [compositeName=BUCKET_NM-280, compositeID=280, imcID=19651, regions=null, sectors=null, clusters=[]]

the clusters here is of the same type but in CompositeInfo class

When I join, I assign clusters of CustomCluster object to CompositeInfo object

(CompositeInfo cinfo, CustomCluster cc) -> {
    if (cc != null && cc.clusters != null) {
        cinfo.clusters = cc.clusters;
    }
    return cinfo;
}
like image 617
Fizi Avatar asked Nov 07 '22 04:11

Fizi


1 Answers

After stumbling on the same issue myself, I would like to provide a detailed answer as well as a simplified example that helps illustrating the problem.

  @Bean
  public Function<KTable<String, String>,
    Function<KTable<String, String>, Consumer<KTable<String, String>>>> processEvents() {
    return firstnames ->
      lastnames ->
        titles -> firstnames
          .mapValues(firstname -> new Salutation().withFirstname(firstname))
          .join(lastnames, (salutation, lastname) -> salutation.withLastname(lastname))
          .leftJoin(titles, (salutation, title) -> salutation.withTitle(title))
          .toStream()
          .foreach((key, salutation) -> log.info("{}: {}", key, salutation));
  }

The example (which uses Spring Cloud Stream with the Kafka Streams binder) shows a common pattern where topics contents are merged into an accumulator object. In our case, a salutation (e.g. "Dear Ms. Smith") is accumulated/aggregated into a Salutation object by joining topics representing the firstname, lastname and an (optional) title.

It is important to note that in this example, the Salutation instance is a mutable object that is constructed step by step. When running such a piece of code, you will see that when changing a person's last name, the merge will always be "running behind". This means that if you publish a lastname event because Ms. Smith has just got married and is now called "Johnson", then Kafka Streams will again emit a Salutation representing "Ms. Smith", despite the fact that she changed her last name. It is only when you publish yet another event for the same person on the lastnames topic (e.g. "Miller") that "Dear Ms. Johnson" will be logged.

The reason for this behavior is found in a piece of code located in KTableKTableInnerJoin.java:

if (change.newValue != null) {
    newValue = joiner.apply(change.newValue, valueRight);
}

if (sendOldValues && change.oldValue != null) {
    oldValue = joiner.apply(change.oldValue, valueRight);
}

context().forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(resultTimestamp));

joiner is a ValueJoiner, which in our case can e.g. be (salutation, lastname) -> salutation.withLastname(lastname) as shown above. The problem with this piece of code is that if you use an accumulation pattern with a mutable accumulator object (in our case an instance of Salutation), which is (by design) reused for all the joins, then oldValue and newValue will be the same object. Moreover, since oldValue is computed afterwards, it will contain the old last name, which explains why Spring Kafka is running behind.

Therefore, it is critical that the object returned by the ValueJoiner is each time a fresh object which does not contain references to other mutable objects, which might be shared (and therefore mutated). The safest approach is therefore to have the ValueJoiner return an immutable object.

I would not consider this as a bug of the library, since it has to compare the old and new state somehow, and since taking a snaphsot of a mutable object would require a deep copy. However, it would probably be worthwhile to have it mentioned in the documentation. Also, issuing a warning when oldValue == newValue would at least make people aware of the problem. I will check whether such improvements could be incorporated.

like image 127
Cédric Schaller Avatar answered Nov 14 '22 21:11

Cédric Schaller