Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Kafka order windowed messages based on their value

I'm trying to find a way to re-order messages within a topic partition and send ordered messages to a new topic.

I have Kafka publisher that sends String messages of the following format: {system_timestamp}-{event_name}?{parameters}

for example:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3

Also, we add some message key for each message, to send them to the corresponding partition.

What I want to do is reorder events based on {system-timestamp} part of the message and within a 1-minute window, cause our publishers doesn't guarantee that messages will be sent in accordance with {system-timestamp} value.

For example, we can deliver to the topic, a message with a bigger {system-timestamp} value first.

I've investigated Kafka Stream API and found some examples regarding messages windowing and aggregation:

Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

 KStreamBuilder builder = new KStreamBuilder();
 KStream<String, String> stream = builder.stream("events");
 KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.

    /* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
                () -> "",  // initial value
                (aggKey, value, aggregate) -> aggregate + "",   // aggregating value
                TimeWindows.of(1000), // intervals in milliseconds
                Serdes.String(), // serde for aggregated value
                "test-store"
        );*/

But what should I do next with this grouped stream? I don't see any 'sort() (e1,e2) -> e1.compareTo(e2)' methods available, also windows could be applied to methods like aggregation(), reduce() ,count() , but I think that I don't need any messages data manipulations.

How can I re-order message in the 1-minute window and send them to another topic?

like image 536
MeetJoeBlack Avatar asked May 12 '17 13:05

MeetJoeBlack


1 Answers

Here's an outline:

Create a Processor implementation that:

  • in process() method, for each message:

    • reads the timestamp from the message value
    • inserts into a KeyValueStore using (timestamp, message-key) pair as the key and the message-value as the value. NB this also provides de-duplication. You'll need to provide a custom Serde to serialize the key so that the timestamp comes first, byte-wise, so that ranged queries are ordered by timestamp first.
  • in the punctuate() method:

    • reads the store using a ranged fetch from 0 to timestamp - 60'000 (=1 minute)
    • sends the fetched messages in order using context.forward() and deletes them from the store

The problem with this approach is that punctuate() is not triggered if no new msgs arrive to advance the "stream time". If this is a risk in your case, you can create an external scheduler that sends periodic "tick" messages to each(!) partition of your topic, that your processor should just ignore, but they'll cause punctuate to trigger in the absence of "real" msgs. KIP-138 will address this limitation by adding explicit support for system-time punctuation: https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics

like image 165
Michal Borowiecki Avatar answered Oct 09 '22 11:10

Michal Borowiecki