Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka time difference last two records, KSQL or other?

So I'm evaluating Kafka. In our use case would have to create new topics containing "time elapsed" from one event to the next, essentially since a sensor will report as "on" or "off" into Kafka. So having the timestamp, sensorname and state, create new topics with duration of the "on" and "off" state.

  1. Is that doable in KSQL, and how?
  2. Or should one really leave this to consumers or stream processors to figure out?

My data is something like this:

{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on} 

to get result

{ 2019:02:15 00:30:00, sensor1, off, 30sec }. 

Essentially have to combine states of multiple sensors to determine combined state of a machine. Hundreds if not eventually thousands of sensors in a factory

like image 987
Thinus Marloth Avatar asked Oct 17 '22 06:10

Thinus Marloth


1 Answers

This is pretty easy in Kafka Streams, so I would opt for 2.

First you have to to model your input data properly. Your example uses local time, which makes it impossible to calculate durations between two timestamps. Use something like epoch time.

Start with a source data model like

interface SensorState {
  String getId();
  Instant getTime();
  State getState();
  enum State {
    OFF,
    ON
  }
}

and a target of

interface SensorStateWithDurationX {
  SensorState getEvent();
  Duration getDuration();
}

Now that you have defined you input and output stream (but see “Data Types and Serialization”) you just need to transform the values (“Applying processors and transformers”) by simply defining a ValueTransformer.

It has to do 2 things:

  1. Check the state store for historical data for the sensor and update it with new data, if necessary

  2. When historical data is available, calculate the difference between the timestamps and emit the data together with the calculated duration

class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
  KeyValueStore<String, SensorState> store;

  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
    this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
  }

  public SensorStateWithDuration transform(SensorState sensorState) {
    // Nothing to do
    if (sensorState == null) {
      return null;
    }

    // Check for the previous state, update if necessary
    var oldState = checkAndUpdateSensorState(sensorState);

    // When we have historical data, return duration so far. Otherwise return null
    return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
  }

  public void close() {}

  /**
   * Checks the state store for historical state based on sensor ID and updates it, if necessary.
   *
   * @param sensorState The new sensor state
   * @return The old sensor state
   */
  Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
    // The Sensor ID is our index
    var index = sensorState.getId();

    // Get the historical state (might be null)
    var oldState = store.get(index);
    if (neetToUpdate(oldState, sensorState)) {
      // Update the state store to the new state
      store.put(index, sensorState);
    }
    return Optional.ofNullable(oldState);
  }

  /**
   * Check if we need to update the state in the state store.
   *
   * <p>Either we have no historical data, or the state has changed.
   *
   * @param oldState The old sensor state
   * @param sensorState The new sensor state
   * @return Flag whether we need to update
   */
  boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
    return oldState == null || oldState.getState() != sensorState.getState();
  }

  /**
   * Wrap the old state with a duration how log it lasted.
   *
   * @param oldState The state of the sensor so far
   * @param sensorState The new state of the sensor
   * @return Wrapped old state with duration
   */
  SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
    var duration = Duration.between(oldState.getTime(), sensorState.getTime());
    return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
  }
}

Putting everything together (“Connecting Processors and State Stores”) in a simple Topology:

var builder = new StreamsBuilder();

// Our state store
var storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("SensorStates"),
        Serdes.String(),
        storeSerde);

// Register the store builder
builder.addStateStore(storeBuilder);

builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
    .transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
    .to("result-topic", Produced.with(Serdes.String(), resultSerde));

var topology = builder.build();

A full application is at github.com/melsicon/kafka-sensors.

like image 107
eik Avatar answered Oct 23 '22 08:10

eik