Suppose I have different values for a same message key.
For example:
{
userid: 1,
email: [email protected] }
{
userid: 1,
email: [email protected] }
{
userid: 1,
email: [email protected] }
In this above case I want only the latest value updated by the user, that is, '[email protected]'.
My kafka stream should give me only the third value and not the previous 2 values.
Since you've not specified a particular client, I'll show you how this can be done with ksqlDB and the newly-added function, LATEST_BY_OFFSET
.
First, I populate the topic with source data:
kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
1:{ "userid": 1, "email": "[email protected]" }
1:{ "userid": 1, "email": "[email protected]" }
1:{ "userid": 1, "email": "[email protected]" }
EOF
Then in ksqlDB model this as a stream of events first:
ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> SET 'auto.offset.reset' = 'earliest'; [35/60]
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
+---------+---------+-----------------+
|ROWKEY |USERID |EMAIL |
+---------+---------+-----------------+
|1 |1 |[email protected] |
|1 |1 |[email protected] |
|1 |1 |[email protected] |
Now we can tell ksqlDB to take this stream of events and give us just the latest value (based on the offset), either directly:
ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
+--------------------+--------------------+
|USERID |KSQL_COL_1 |
+--------------------+--------------------+
|1 |[email protected] |
Press CTRL-C to interrupt
or more usefully, as materialised state within ksqlDB:
CREATE TABLE USER_LATEST_STATE AS
SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL
FROM USER_UPDATES
GROUP BY USERID
EMIT CHANGES;
This table is still driven by changes to the Kafka topic, but can be queried directly for the current state, either as of now ("pull query"):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
+--------------------+
|EMAIL |
+--------------------+
|[email protected] |
Query terminated
ksql>
or as a stream of changes as the state evolves ("push query"):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
+--------------------+
|EMAIL |
+--------------------+
|[email protected] |
[ query continues indefinitely ]
It seems that you want to buffer the records before further processing. Since in streaming, you have ever-growing, infinite data sets so you never know if you gonna wait for more records or flush the buffer to further processing. Can you add more details about how you will process these records?
You can introduce an additional parameter which is the maximum time you want to wait before flushing the buffer. To archive this you can either use a Session window or a Tumbling window, or use records cache in associate with a commit interval, or you can also implement it with Kafka low lever processor API.
Here is the example code show how you can archive this using a Tumbling window to aggregate and suppress all userId info in 1 hour time window, accept events which are late in 10 minutes then send the suppressed events to downstream processor (if you use this you may not get the final results until new event coming in):
userInfoKStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(10)))
.aggregate(() -> "", (userId, newValue, currentValue) -> newValue)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((userId, value) -> {});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With