i am trying to join 2 KTables.
KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(),
new JsonPOJOSerde<>(RecordBean.class),
bidTopic, RECORDS_STORE);
KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(),
new JsonPOJOSerde<>(ImpressionBean.class),
impressionTopic, IMPRESSIONS_STORE);
KTable<String, RecordBean> mergedByTxId = recordsTable
.join(impressionsTable, merge());
The merge functions is very simple, i am just copying value from one bean to another.
public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() {
return (v1, v2) -> {
v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount());
return v1;
};
But for some reasons the join function is calling twice on single produced record. Please see streaming/producer config below
Properties streamsConfiguration = new Properties();
streamsConfiguration
.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp")
.getAbsolutePath());
return streamsConfiguration;
Producer config -
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return producerConfig;
Next i am submitting single record per stream. Both records has same keys. I am expecting to receive single record as output.
IntegrationTestUtils.produceKeyValuesSynchronously(bidsTopic,
Arrays.asList(new KeyValue("1", getRecordBean("1"))),
getProducerProperties());
IntegrationTestUtils.produceKeyValuesSynchronously(impressionTopic,
Arrays.asList(new KeyValue("1", getImpressionBean("1"))),
getProducerProperties());
List<KeyValue<String, String>> parsedRecord =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
outputTopic, 1);
But ValueJoiner triggers 2 times, and i am getting 2 identical output records instead one. During trigger time - both values from both streams exist - and i cannot get what is triggering second execution.
Without joining - i cannot reproduce this behavior. I cannot find any working example of 2 ktable join - so cannot understand whats wrong with my approach.
Adding simple code that demonstrate same behavior
KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> first = builder.table("stream1", "storage1");
KTable<String, String> second = builder.table("stream2", "storage2");
KTable<String, String> joined = first.join(second, (value1, value2) -> value1);
joined.to("output");
KafkaStreams streams = new KafkaStreams(builder, getStreamingProperties());
streams.start();
IntegrationTestUtils.produceKeyValuesSynchronously("stream1",
Arrays.asList(new KeyValue("1", "first stream")),
getProducerProperties());
IntegrationTestUtils.produceKeyValuesSynchronously("stream2",
Arrays.asList(new KeyValue("1", "second stream")),
getProducerProperties());
List<KeyValue<String, String>> parsedRecord =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
"output", 1);
Outer KStream-KStream JoinAn outer join will emit an output each time an event is processed in either stream. If the window state already contains an element with the same key in the other stream, it will apply the join method to both elements. If not, it will only apply the incoming element.
Stream-stream joins combine two event streams into a new stream. The streams are joined based on a common key, so keys are necessary. You define a time window, and records on either side of the join need to arrive within the defined window.
Create the Kafka Streams topology A stream is opened up for each input topic. The input streams are then combined using the merge function, which creates a new stream that represents all of the events of its inputs.
So the expected output would be 8000 records.
I got following explanation after posting similar question to Confluent mail groups.
I think this might be related to caching. The caches for the 2 tables are flushed independently, so there is a chance you will get the same record twice. If stream1 and stream2 both receive a record for the same key, and the cache flushes, then:
The cache from stream1 will flush, perform the join, and produce a record.
The cache from stream2 will flush, perform the join, and produce a record.
Technically this is ok as the result of the join is another KTable, so the value in the KTable will be the correct value.
After setting following variable to 0 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0 - issue was resolved. I still got 2 records - but now one record is joined with null - and its much clear behavior according to join semantics document that was provided above.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With