Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding Custom Headers in Kafka Message

Tags:

I am sending a file as a message by converting it to a byte array using kafka producer.

I also need to add some headers for the message, for example file name, timestamps etc so at the consumer end I can process the message based on file name and other headers.

What I am currently doing is creating a object and wrapping the raw message and headers in it and sending the object in a byte array as a message.

I would like to know if there is a way by which I can add custom headers while publishing the message?

like image 311
CTDex Avatar asked Mar 13 '15 05:03

CTDex


People also ask

Do Kafka messages have headers?

A Header is a key-value pair, and multiple headers can be included with the key, value, and timestamp in each Kafka message. If the value contains schema information, then the header will have a non-null schema .

How do I add a header to a Kafka template?

We can add headers to a Kafka message using either Message<?> or ProducerRecord<String, String> like shown in the following code. We configure the KafkaTemplate inside the SenderConfig class. For simplicity we used a StringSerializer for both key and value fields.

Can Kafka message be modified?

You can modify Kafka topic properties by navigating to the Topics page, and editing content in the Configs tab of the topic Profile.


2 Answers

Kafka v0.11.0.0 adds support for custom headers.

You can add them when creating a ProducerRecord like this:

new ProducerRecord(key, value, headers, ...), where headers is of type Iterable<Header>

For more details see:

https://issues.apache.org/jira/browse/KAFKA-4208

https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

like image 75
Michal Borowiecki Avatar answered Sep 21 '22 19:09

Michal Borowiecki


Record level headers were introduced from Kafka 0.11.0. We can send a list of Headers in each record.

List<Header> headers = Arrays.asList(new RecordHeader("header_key", "header_value".getBytes())); ProducerRecord<String, String> record = new ProducerRecord<>("topic", null, "key", "value", headers); 
like image 23
meril Avatar answered Sep 19 '22 19:09

meril