Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to get the latest value for a message key from kafka messages

Suppose I have different values for a same message key.

For example:

{
userid: 1,
email: [email protected] }

{
userid: 1,
email: [email protected] }

{
userid: 1,
email: [email protected] }

In this above case I want only the latest value updated by the user, that is, '[email protected]'.

My kafka stream should give me only the third value and not the previous 2 values.

like image 292
Wilmetta Avatar asked Dec 18 '22 13:12

Wilmetta


2 Answers

Since you've not specified a particular client, I'll show you how this can be done with ksqlDB and the newly-added function, LATEST_BY_OFFSET.

First, I populate the topic with source data:

kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
1:{ "userid": 1, "email": "[email protected]" }
1:{ "userid": 1, "email": "[email protected]" }
1:{ "userid": 1, "email": "[email protected]" }
EOF

Then in ksqlDB model this as a stream of events first:

ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

ksql> SET 'auto.offset.reset' = 'earliest';                                                                                                                                                                                                                                         [35/60]
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
+---------+---------+-----------------+
|ROWKEY   |USERID   |EMAIL            |
+---------+---------+-----------------+
|1        |1        |[email protected]  |
|1        |1        |[email protected]  |
|1        |1        |[email protected]  |

Now we can tell ksqlDB to take this stream of events and give us just the latest value (based on the offset), either directly:

ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
+--------------------+--------------------+
|USERID              |KSQL_COL_1          |
+--------------------+--------------------+
|1                   |[email protected]     |

Press CTRL-C to interrupt

or more usefully, as materialised state within ksqlDB:

CREATE TABLE USER_LATEST_STATE AS 
    SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL 
      FROM USER_UPDATES 
     GROUP BY USERID 
     EMIT CHANGES;

This table is still driven by changes to the Kafka topic, but can be queried directly for the current state, either as of now ("pull query"):

ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
+--------------------+
|EMAIL               |
+--------------------+
|[email protected]     |
Query terminated
ksql>

or as a stream of changes as the state evolves ("push query"):

ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
+--------------------+
|EMAIL               |
+--------------------+
|[email protected]     |

[ query continues indefinitely ]

asciicast

like image 144
Robin Moffatt Avatar answered Apr 05 '23 23:04

Robin Moffatt


It seems that you want to buffer the records before further processing. Since in streaming, you have ever-growing, infinite data sets so you never know if you gonna wait for more records or flush the buffer to further processing. Can you add more details about how you will process these records?

You can introduce an additional parameter which is the maximum time you want to wait before flushing the buffer. To archive this you can either use a Session window or a Tumbling window, or use records cache in associate with a commit interval, or you can also implement it with Kafka low lever processor API.

Here is the example code show how you can archive this using a Tumbling window to aggregate and suppress all userId info in 1 hour time window, accept events which are late in 10 minutes then send the suppressed events to downstream processor (if you use this you may not get the final results until new event coming in):

userInfoKStream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(10)))
    .aggregate(() -> "", (userId, newValue, currentValue) -> newValue)
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream()
    .foreach((userId, value) -> {});
like image 33
Tuyen Luong Avatar answered Apr 05 '23 23:04

Tuyen Luong