Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to access message headers with Kafka Streams?

Tags:

With the addition of Headers to the records (ProducerRecord & ConsumerRecord) in Kafka 0.11, is it possible to get these headers when processing a topic with Kafka Streams? When calling methods like map on a KStream it provides arguments of the key and the value of the record but no way I can see to access the headers. It would be nice if we could just map over the ConsumerRecords.

ex.

KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> stream = kStreamBuilder.stream("some-topic"); stream     .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?     ...  

something like this would work:

KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> stream = kStreamBuilder.stream("some-topic"); stream     .map((record) -> {         record.headers();         record.key();         record.value();     })     ... 
like image 794
Nathan Myles Avatar asked Oct 13 '17 19:10

Nathan Myles


People also ask

Does Kafka message have headers?

Interface Header A Header is a key-value pair, and multiple headers can be included with the key, value, and timestamp in each Kafka message.

How do I pass the header of a Kafka message?

headers(). add(new RecordHeader("key","value1". getBytes()))-This is the key and value of headers data that we are sending to kafka. To verify your data you can check the topic in the kafka control center and verify the headers sent.

What is the difference between Kafka and Kafka stream?

Introduction. Apache Kafka is the most popular open-source distributed and fault-tolerant stream processing system. Kafka Consumer provides the basic functionalities to handle messages. Kafka Streams also provides real-time stream processing on top of the Kafka Consumer client.

What is Kafka Streams good for?

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in an Apache Kafka® cluster. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.


1 Answers

Records headers are accessible since versions 2.0.0 (cf. KIP-244 for details).

You can access record metadata via the Processor API (ie, via transform(), transformValues(), or process()), by the given "context" object (cf. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).

Update

As of 2.7.0 release, the Processor API was improved (cf. KIP-478), adding a new type-safe api.Processor class with process(Record) instead of process(K, V) method. For this case, headers (and record metadata) are accessible via the Record class).

This new feature is not yet available in "PAPI method of the DSL though (eg. KStream#process(), KStream#transform() and siblings).

+++++

Prior to 2.0, the context only exposes topic, partition, offset, and timestamp---but not headers that are in fact dropped by Streams on read in those older versions.

Metadata is not available at DSL level though. However, there is also work in progress to extend the DSL via KIP-159.

like image 196
Matthias J. Sax Avatar answered Sep 20 '22 09:09

Matthias J. Sax