Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I get the offset value in KStream

I'm developing a PoC with Kafka Streams. Now I need to get the offset value in the stream consumer and use it to generate a unique key (topic-offset)->hash for each message. The reason is: the producers are syslog and only few of them have IDs. I cannot generate a UUID in the consumer because in case of a reprocess I need to regenerate the same key.

My problem is: the org.apache.kafka.streams.processor.ProcessorContext class exposes an .offset() method that returns the value, but I'm using KStream instead of the Processor, and I couldn't find a method that returns the same thing.

Anybody knows how to extract the consumer value for each row from a KStream? Thanks in advance.

like image 910
Stefano Frigerio Avatar asked Nov 25 '16 14:11

Stefano Frigerio


People also ask

What is KStream and KTable in Kafka?

KStream, KTable and GlobalKTable. Kafka Streams provides two abstractions for Streams and Tables. KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update.

What is the output of KStream KTable join?

Lets say there are 8000 records in KStream, 14 records in KTable and Assuming that for each key in KStreams there is a record in KTable. So the expected output would be 8000 records.

How does KTable work?

A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT).

What is KStreams?

KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an independent entity/event in the real world. For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2> in the stream.


Video Answer


1 Answers

You can use mix-and-match DSL and Processor API via process(...), transform(...), and transformValues(...).

It allows you to access the current record offset similar to plain Processor API. In you case, it seems you want to use KStream#transform(...).

like image 156
Matthias J. Sax Avatar answered Oct 27 '22 10:10

Matthias J. Sax