We are building an application to get data from sensors. The data is streamed to Kafka from where consumers will publish it to different data stores. Each data point will have multiple attributes representing the state of the sensor.
In one of the consumers we want to publish the data to the data store only if the value has changed. for e.g. if there is temperature sensor which is polled for data every 10 secs we expect to receive data like
----------------------------------------------------------------------
Key Value
----------------------------------------------------------------------
Sensor1 {timestamp: "10-10-2019 10:20:30", temperature: 10}
Sensor1 {timestamp: "10-10-2019 10:20:40", temperature: 10}
Sensor1 {timestamp: "10-10-2019 10:20:50", temperature: 11}
In the above case only the first record and the third record should be published.
For this we need some way to compare the current value for a key with the previous value with the same key. I believe this should be possible with KTable or KStream but unable to find examples.
Any help will be great!
Here is an example how to solve this with KStream#transformValues()
.
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, YourValueType>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
Serdes.String(),
YourValueTypeSerde());
builder.addStateStore(keyValueStoreBuilder);
stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), YourValueTypeSerde()))
.transformValues(() -> new ValueTransformerWithKey<String, YourValueType, YourValueType>() {
private KeyValueStore<String, YourValueType> state;
@Override
public void init(final ProcessorContext context) {
state = (KeyValueStore<String, YourValueType>) context.getStateStore(stateStoreName);}
@Override
public YourValueType transform(final String key, final YourValueType value) {
YourValueType prevValue = state.get(key);
if (prevValue != null) {
if (prevValue.temperature() != value.temperature()) {
return prevValue;
}
} else {
state.put(key, value);
}
return null;
}
@Override
public void close() {}
}, stateStorName))
.to(OUTPUT_TOPIC);
You compare the record with the previous record stored in the state store. If temperature is different you return the record from the state store and store the current record in the state store. If the temperature is equal you discard the current record.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With