I am publishing Kafka with JSON messages, eg:
"UserID":111,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":2,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:40.200Z,"Comments":0,"Like":6
"UserID":222,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":1,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:44.600Z,"Comments":3,"Like":12
I would like to sort messages based on UpdateTime
in 10 second time window using Kafka Streams and push back sorted messages in another Kafka topic.
I have created a stream, which reads data from the input topic and then I am creating TimeWindowedKStream
after groupByKey()
where the UserID is the key in the message (Although its not necessary to groupByKey
and then sort, but I could not get WindowedBy
directly). But I am not able to sort messages in 10 second window based on UpdateTime
further. My source code is:
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-sorting");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("UnsortedMessages");
TimeWindowedKStream<String, String> countss = source.groupByKey().windowedBy(TimeWindows.of(10000L)
.until(10000L));
/*
SORTING CODE
*/
outputMessage.toStream().to("SortedMessages", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-sorting-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
Many thanks in advance.
If you want to sort messages ignoring the key, it makes only sense to do this based on partitions and also only if the input topic has the same number of partitions as the output topic. For this case, you should extract the partition number and use it as message key (cf: https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information)
For sorting, it's more tricky. Note, that Kafka Streams follows a "continuous output" model and does emit updates for each input record using the DSL. Thus, it might be better to use Processor API. You would use a Processor
with an attached store and put records into the store. As an in-memory structure you keep a sorted list of records. While time advances, you can emit "finished" windows and delete the corresponding records from the store.
I don't think you can build this using the DSL.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With