I'm learning Kafka Streams and try to achieve the following:
Created 2 Kafka topics(say topic1, topic2) with null as key and JSONString as value. Data from topic1(no duplicates) have multiple matching entries in topic2. I.e. topic1 has some mainstream data to generate new multiple data-stream when joined with topic2.
Example:
topic1={"name": "abc", "age":2}, {"name": "xyz", "age":3} and so on.
topic2={"name": "abc", "address"="xxxxxx"}, {"name": "abc", "address"="yyyyyy"}, {"name": "xyz", "address"="jjjjjj"}, {"name": "xyz", "address"="xxxkkkkk"}
Expected Output: {"name": "abc", "age":2, "address"="xxxxxx"}, {"name": "abc", "age":2, "address"="yyyyyy"}, {"name": "xyz", "age":3, "address"="jjjjjj"}, {"name": "xyz", "age":3, "address"="xxxkkkkk"}
Would like to persist/hold data-stream from topic1 for future references, while data-stream from topic2 is just used to achieve the above said use-case and doesn't require any persistence/holding back.
I have few questions: 1) Should hold/store topic1 data stream for few days(possible?) so that incoming data stream from topic2 could be joined. Is it possible? 2) What should I use to achieve this, KStream or KTable? 3) Is this called backpressure mechanism?
Does Kafka Stream support this use-case or should I look out for something else? Plese, suggest.
I have tried a piece of code with KStream with 5 min window but looks like I'm not able to hold topic1 data in the stream.
Please help me with the right choice and join. I'm using Kafka from Confluent with Docker instance.
public void run() {
final StreamsBuilder builder = new StreamsBuilder();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
// Hold data from this topic to 30 days
KStream<String, JsonNode> cs = builder.stream("topic1", consumed);
cs.foreach((k,v) -> {
System.out.println( k + " --->" + v);
});
// Data is involved in one time process.
KStream<String, JsonNode> css = builder.stream("topic2", consumed);
css.foreach((k,v) -> {
System.out.println( k + " --->" + v);
});
KStream<String, JsonNode> resultStream = cs.leftJoin(css,
valueJoiner,
JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
Joined.with(
Serdes.String(), /* key */
jsonSerde, /* left value */
jsonSerde) /* right value */
);
resultStream.foreach((k, v) -> {
System.out.println("JOIN STREAM: KEY="+k+ ", VALUE=" + v);
});
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
}
Joins in Kafka are always based on keys.(*) Thus, to make any join work, you need to extract the fields you want to join on into the key before you do the actual join (the only partial exception would be KStream-GlobalKTable join). In your code example, you won't get any results because all records have a null
key and cannot be joined for this reason.
For the join itself, it seems that a KStream-KTable join would be right choice for your use case. To make this work, you will need to:
topic1
and write the data into an additional topic (let's call it topic1Keyed
)topic1Keyed
as a tabletopic2
topic2
with the KTable
For full details about join semantics, check out this blog post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
(*) UPDATE:
Since the 2.4 release, Kafka Streams also support foreign key table-table joins.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With