See Update below to show potential workaround
Our application consumes 2 topics as KTables, performs a left join, and outputs to a topic. During testing, we found that this works as expected when our output topic has only 1 partition. When we increase the number of partitions, we notice that the number of messages that get produced to the output topic decreases.
We tested this theory with multiple partition configurations prior to starting the app. With 1 partition, we see 100% of the messages. With 2, we see some messages (less than 50%). With 10, we see barely any (less than 10%).
Because we are left joining, every single message that is consumed from Topic 1 should get written to our output topic, but we're finding that this is not happening. It seems like messages are getting stuck in the "intermediate" topics created from the foreign key join of the Ktables, but there are no error messages.
Any help would be greatly appreciated!
Service.java
@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
build.gradle
plugins {
id 'org.springframework.boot' version '2.3.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}
...
ext {
set('springCloudVersion', "Hoxton.SR6")
}
...
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
Note: We are excluding the org.apache.kafka dependencies due to a bug in the versions included in spring-cloud-stream
application.yml
spring:
application:
name: app-name
stream:
bindings:
process-in-0:
destination: topic1
group: ${spring.application.name}
process-in-1:
destination: topic2
group: ${spring.application.name}
process-out-0:
destination: outputTopic
kafka:
streams:
binder:
applicationId: ${spring.application.name}
brokers: ${KAFKA_BROKERS}
configuration:
commit.interval.ms: 1000
producer:
acks: all
retries: 20
default:
key:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
min-partition-count: 2
Test Scenario:
To provide a concrete example, if I publish the following 3 messages to Topic 1:
{"fieldA": 1, "fieldB": 1},,{"fieldA": 1, "fieldB": 1}
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
{"fieldA": 4, "fieldB": 4},,{"fieldA": 4, "fieldB": 4}
The output topic will only receive 2 messages.
{"fieldA": 2, "fieldB": 2},,{"fieldA": 2, "fieldB": 2}
{"fieldA": 3, "fieldB": 3},,{"fieldA": 3, "fieldB": 3}
What happened to the other 2? It seems certain key/value pairs are just unable to get written to the output topic. Retrying these "lost" messages does not work either.
Update:
I was able to get this functioning properly by consuming Topic 1 as a KStream instead of a KTable and calling toTable()
before going on to do the KTable-KTable join. I am still not sure why my original solution does not work, but hopefully this workaround can shed some light on the actual issue.
@Bean
public BiFunction<KStream<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.map(...)
.toTable()
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
Lets say there are 8000 records in KStream, 14 records in KTable and Assuming that for each key in KStreams there is a record in KTable. So the expected output would be 8000 records.
A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT).
You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.
Given the description of the problem, it seems that the data in the (left) KTable input topic is not correctly partitioned by its key. For a single partitioned topic, well, there is only one partition and all data goes to this one partition and the join result is complete.
However, for a multi-partitioned input topic, you need to ensure that the data is partitioned by key, otherwise, two records with the same key might end up in different partitions and thus the join fails (as the join is done on a per-partition basis).
Note that even if a foreign-key join does not require that both input topics are co-partitioned, it is still required that each input topic itself is partitioned by its key!
If you use a map().toTable()
you basically trigger an internal repartitioning of the data that ensures that the data gets partitioned by the key, and this fixes the problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With