Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use kafka to detect changes on values

I have a streaming application that continuously takes in a stream of coordinates along with some custom metadata that also includes a bitstring. This stream is produced onto a kafka topic using producer API. Now another application needs to process this stream [Streams API] and store the specific bit from the bit string and generate alerts when this bit changes

Below is the continuous stream of messages that need to be processed

{"device_id":"1","status_bit":"0"}
{"device_id":"2","status_bit":"1"}
{"device_id":"1","status_bit":"0"}
{"device_id":"3","status_bit":"1"}
{"device_id":"1","status_bit":"1"} // need to generate alert with change: 0->1
{"device_id":"3","status_bits":"1"}
{"device_id":"2","status_bit":"1"}
{"device_id":"3","status_bits":"0"} // need to generate alert with change 1->0

Now I would like to write these alerts to another kafka topic like

{"device_id":1,"init":0,"final":1,"timestamp":"somets"}
{"device_id":3,"init":1,"final":0,"timestamp":"somets"}

I can save the current bit in the state store using something like

streamsBuilder
        .stream("my-topic")
        .mapValues((key, value) -> value.getStatusBit())
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
        .reduce((oldAggValue, newMessageValue) -> newMessageValue, Materialized.as("bit-temp-store"));

but I am unable to understand how can I detect this change from the existing bit. Do I need to query the state store somehow inside the processor topology? If yes? How? If no? What else could be done?

Any suggestions/ideas that I can try(maybe completely different from what I am thinking) are also appreciated. I am new to Kafka and thinking in terms of event driven streams is eluding me.

Thanks in advance.

like image 558
Nikhil Sahu Avatar asked Mar 29 '19 08:03

Nikhil Sahu


1 Answers

I am not sure this is the best approach, but in the similar task I used an intermediate entity to capture the state change. In your case it will be something like

    streamsBuilder.stream("my-topic").groupByKey()
              .aggregate(DeviceState::new, new Aggregator<String, Device, DeviceState>() {
            public DeviceState apply(String key, Device newValue, DeviceState state) {
                if(!newValue.getStatusBit().equals(state.getStatusBit())){
                     state.setChanged(true);    
                }
                state.setStatusBit(newValue.getStatusBit());
                state.setDeviceId(newValue.getDeviceId());
                state.setKey(key);
                return state;
            }
        }, TimeWindows.of(…) …).filter((s, t) -> (t.changed())).toStream();

In the resulting topic you will have the changes. You can also add some attributes to DeviceState to initialise it first, depending whether you want to send the event, when the first device record arrives, etc.

like image 187
Katya Gorshkova Avatar answered Sep 19 '22 06:09

Katya Gorshkova