Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka streams - joining two ktables invokes join function twice

i am trying to join 2 KTables.

KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(),
    new JsonPOJOSerde<>(RecordBean.class),
    bidTopic, RECORDS_STORE);

KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(),
    new JsonPOJOSerde<>(ImpressionBean.class),
    impressionTopic, IMPRESSIONS_STORE);

KTable<String, RecordBean> mergedByTxId = recordsTable
    .join(impressionsTable, merge());

The merge functions is very simple, i am just copying value from one bean to another.

public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() {
return (v1, v2) -> {
  v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount());
  return v1;
};

But for some reasons the join function is calling twice on single produced record. Please see streaming/producer config below

Properties streamsConfiguration = new Properties();
streamsConfiguration
    .put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp")
    .getAbsolutePath());

return streamsConfiguration;

Producer config -

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return producerConfig;

Next i am submitting single record per stream. Both records has same keys. I am expecting to receive single record as output.

 IntegrationTestUtils.produceKeyValuesSynchronously(bidsTopic,
    Arrays.asList(new KeyValue("1", getRecordBean("1"))),
    getProducerProperties());

IntegrationTestUtils.produceKeyValuesSynchronously(impressionTopic,
    Arrays.asList(new KeyValue("1", getImpressionBean("1"))),
    getProducerProperties());

List<KeyValue<String, String>> parsedRecord =
    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
        outputTopic, 1);

But ValueJoiner triggers 2 times, and i am getting 2 identical output records instead one. During trigger time - both values from both streams exist - and i cannot get what is triggering second execution.

Without joining - i cannot reproduce this behavior. I cannot find any working example of 2 ktable join - so cannot understand whats wrong with my approach.

Adding simple code that demonstrate same behavior

KStreamBuilder builder = new KStreamBuilder();

KTable<String, String> first = builder.table("stream1", "storage1");
KTable<String, String> second = builder.table("stream2", "storage2");

KTable<String, String> joined = first.join(second, (value1, value2) -> value1);

joined.to("output");

KafkaStreams streams = new KafkaStreams(builder, getStreamingProperties());

streams.start();

IntegrationTestUtils.produceKeyValuesSynchronously("stream1",
    Arrays.asList(new KeyValue("1", "first stream")),
    getProducerProperties());

IntegrationTestUtils.produceKeyValuesSynchronously("stream2",
    Arrays.asList(new KeyValue("1", "second stream")),
    getProducerProperties());

List<KeyValue<String, String>> parsedRecord =
    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
        "output", 1);
like image 229
Дмитрий Карпов Avatar asked Jan 02 '17 16:01

Дмитрий Карпов


People also ask

How does KStream join work?

Outer KStream-KStream JoinAn outer join will emit an output each time an event is processed in either stream. If the window state already contains an element with the same key in the other stream, it will apply the join method to both elements. If not, it will only apply the incoming element.

How does Kafka stream join work?

Stream-stream joins combine two event streams into a new stream. The streams are joined based on a common key, so keys are necessary. You define a time window, and records on either side of the join need to arrive within the defined window.

How do I combine two topics in Kafka?

Create the Kafka Streams topology A stream is opened up for each input topic. The input streams are then combined using the merge function, which creates a new stream that represents all of the events of its inputs.

What is the output of KStream Ktable join?

So the expected output would be 8000 records.


1 Answers

I got following explanation after posting similar question to Confluent mail groups.

I think this might be related to caching. The caches for the 2 tables are flushed independently, so there is a chance you will get the same record twice. If stream1 and stream2 both receive a record for the same key, and the cache flushes, then:

The cache from stream1 will flush, perform the join, and produce a record.

The cache from stream2 will flush, perform the join, and produce a record.

Technically this is ok as the result of the join is another KTable, so the value in the KTable will be the correct value.

After setting following variable to 0 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0 - issue was resolved. I still got 2 records - but now one record is joined with null - and its much clear behavior according to join semantics document that was provided above.

like image 62
Дмитрий Карпов Avatar answered Nov 12 '22 02:11

Дмитрий Карпов