Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka-Streams Join 2 topics with JSON values | backpressure mechanism?

I'm learning Kafka Streams and try to achieve the following:

Created 2 Kafka topics(say topic1, topic2) with null as key and JSONString as value. Data from topic1(no duplicates) have multiple matching entries in topic2. I.e. topic1 has some mainstream data to generate new multiple data-stream when joined with topic2.

Example:

topic1={"name": "abc", "age":2}, {"name": "xyz", "age":3} and so on.
topic2={"name": "abc", "address"="xxxxxx"}, {"name": "abc", "address"="yyyyyy"}, {"name": "xyz", "address"="jjjjjj"}, {"name": "xyz", "address"="xxxkkkkk"}

Expected Output: {"name": "abc", "age":2, "address"="xxxxxx"}, {"name": "abc", "age":2, "address"="yyyyyy"}, {"name": "xyz", "age":3, "address"="jjjjjj"}, {"name": "xyz", "age":3, "address"="xxxkkkkk"}

Would like to persist/hold data-stream from topic1 for future references, while data-stream from topic2 is just used to achieve the above said use-case and doesn't require any persistence/holding back.

I have few questions: 1) Should hold/store topic1 data stream for few days(possible?) so that incoming data stream from topic2 could be joined. Is it possible? 2) What should I use to achieve this, KStream or KTable? 3) Is this called backpressure mechanism?

Does Kafka Stream support this use-case or should I look out for something else? Plese, suggest.

I have tried a piece of code with KStream with 5 min window but looks like I'm not able to hold topic1 data in the stream.

Please help me with the right choice and join. I'm using Kafka from Confluent with Docker instance.

public void run() {
        final StreamsBuilder builder = new StreamsBuilder();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
        final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);

        // Hold data from this topic to 30 days
        KStream<String, JsonNode> cs = builder.stream("topic1", consumed);
        cs.foreach((k,v) -> {
            System.out.println( k + " --->" + v);
        });

        // Data is involved in one time process.
        KStream<String, JsonNode> css = builder.stream("topic2", consumed);
        css.foreach((k,v) -> {
            System.out.println( k + " --->" + v);
        });

        KStream<String, JsonNode> resultStream = cs.leftJoin(css,
                valueJoiner,
                JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
                Joined.with(
                        Serdes.String(), /* key */
                        jsonSerde,       /* left value */
                        jsonSerde)       /* right value */
        );

        resultStream.foreach((k, v) -> {
            System.out.println("JOIN STREAM: KEY="+k+ ", VALUE=" + v);
        });

        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();
    }
like image 566
srikanth Avatar asked Mar 06 '23 03:03

srikanth


1 Answers

Joins in Kafka are always based on keys.(*) Thus, to make any join work, you need to extract the fields you want to join on into the key before you do the actual join (the only partial exception would be KStream-GlobalKTable join). In your code example, you won't get any results because all records have a null key and cannot be joined for this reason.

For the join itself, it seems that a KStream-KTable join would be right choice for your use case. To make this work, you will need to:

  1. set the join key correctly for topic1 and write the data into an additional topic (let's call it topic1Keyed)
  2. read topic1Keyed as a table
  3. set the join key correctly for topic2
  4. join topic2 with the KTable

For full details about join semantics, check out this blog post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

(*) UPDATE:

Since the 2.4 release, Kafka Streams also support foreign key table-table joins.

like image 134
Matthias J. Sax Avatar answered Mar 18 '23 18:03

Matthias J. Sax