Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KTable-KTable foreign-key join not producing all messages when topics have more than one partition

See Update below to show potential workaround

Our application consumes 2 topics as KTables, performs a left join, and outputs to a topic. During testing, we found that this works as expected when our output topic has only 1 partition. When we increase the number of partitions, we notice that the number of messages that get produced to the output topic decreases.

We tested this theory with multiple partition configurations prior to starting the app. With 1 partition, we see 100% of the messages. With 2, we see some messages (less than 50%). With 10, we see barely any (less than 10%).

Because we are left joining, every single message that is consumed from Topic 1 should get written to our output topic, but we're finding that this is not happening. It seems like messages are getting stuck in the "intermediate" topics created from the foreign key join of the Ktables, but there are no error messages.

Any help would be greatly appreciated!

Service.java

@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}

build.gradle

plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}

...

ext {
    set('springCloudVersion', "Hoxton.SR6")
}

...

implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'

Note: We are excluding the org.apache.kafka dependencies due to a bug in the versions included in spring-cloud-stream

application.yml

spring:
  application:
    name: app-name
    stream:
      bindings:
        process-in-0:
          destination: topic1
          group: ${spring.application.name}
        process-in-1:
          destination: topic2
          group: ${spring.application.name}
        process-out-0:
          destination: outputTopic
      kafka:
        streams:
          binder:
            applicationId: ${spring.application.name}
            brokers: ${KAFKA_BROKERS}
            configuration:
              commit.interval.ms: 1000
              producer:
                acks: all
                retries: 20
              default:
                key:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            min-partition-count: 2

Test Scenario:

To provide a concrete example, if I publish the following 3 messages to Topic 1:

{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}

The output topic will only receive 2 messages.

{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}

What happened to the other 2? It seems certain key/value pairs are just unable to get written to the output topic. Retrying these "lost" messages does not work either.

Update:

I was able to get this functioning properly by consuming Topic 1 as a KStream instead of a KTable and calling toTable() before going on to do the KTable-KTable join. I am still not sure why my original solution does not work, but hopefully this workaround can shed some light on the actual issue.

@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .map(...)
                    .toTable()
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}
like image 674
Mario P. Avatar asked Jul 13 '20 21:07

Mario P.


People also ask

What is the output of KStream KTable join?

Lets say there are 8000 records in KStream, 14 records in KTable and Assuming that for each key in KStreams there is a record in KTable. So the expected output would be 8000 records.

How does a KTable work?

A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT).

How do you convert KStream to KTable?

You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.


1 Answers

Given the description of the problem, it seems that the data in the (left) KTable input topic is not correctly partitioned by its key. For a single partitioned topic, well, there is only one partition and all data goes to this one partition and the join result is complete.

However, for a multi-partitioned input topic, you need to ensure that the data is partitioned by key, otherwise, two records with the same key might end up in different partitions and thus the join fails (as the join is done on a per-partition basis).

Note that even if a foreign-key join does not require that both input topics are co-partitioned, it is still required that each input topic itself is partitioned by its key!

If you use a map().toTable() you basically trigger an internal repartitioning of the data that ensures that the data gets partitioned by the key, and this fixes the problem.

like image 50
Matthias J. Sax Avatar answered Oct 06 '22 11:10

Matthias J. Sax