With the addition of Headers to the records (ProducerRecord & ConsumerRecord) in Kafka 0.11, is it possible to get these headers when processing a topic with Kafka Streams? When calling methods like map
on a KStream
it provides arguments of the key
and the value
of the record but no way I can see to access the headers
. It would be nice if we could just map
over the ConsumerRecord
s.
ex.
KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> stream = kStreamBuilder.stream("some-topic"); stream .map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc? ...
something like this would work:
KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> stream = kStreamBuilder.stream("some-topic"); stream .map((record) -> { record.headers(); record.key(); record.value(); }) ...
Interface Header A Header is a key-value pair, and multiple headers can be included with the key, value, and timestamp in each Kafka message.
headers(). add(new RecordHeader("key","value1". getBytes()))-This is the key and value of headers data that we are sending to kafka. To verify your data you can check the topic in the kafka control center and verify the headers sent.
Introduction. Apache Kafka is the most popular open-source distributed and fault-tolerant stream processing system. Kafka Consumer provides the basic functionalities to handle messages. Kafka Streams also provides real-time stream processing on top of the Kafka Consumer client.
Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in an Apache Kafka® cluster. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.
Records headers are accessible since versions 2.0.0 (cf. KIP-244 for details).
You can access record metadata via the Processor API (ie, via transform()
, transformValues()
, or process()
), by the given "context" object (cf. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).
Update
As of 2.7.0 release, the Processor API was improved (cf. KIP-478), adding a new type-safe api.Processor
class with process(Record)
instead of process(K, V)
method. For this case, headers (and record metadata) are accessible via the Record
class).
This new feature is not yet available in "PAPI method of the DSL though (eg. KStream#process()
, KStream#transform()
and siblings).
+++++
Prior to 2.0, the context only exposes topic, partition, offset, and timestamp---but not headers that are in fact dropped by Streams on read in those older versions.
Metadata is not available at DSL level though. However, there is also work in progress to extend the DSL via KIP-159.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With