I am sending a file as a message by converting it to a byte array using kafka producer.
I also need to add some headers for the message, for example file name, timestamps etc so at the consumer end I can process the message based on file name and other headers.
What I am currently doing is creating a object and wrapping the raw message and headers in it and sending the object in a byte array as a message.
I would like to know if there is a way by which I can add custom headers while publishing the message?
A Header is a key-value pair, and multiple headers can be included with the key, value, and timestamp in each Kafka message. If the value contains schema information, then the header will have a non-null schema .
We can add headers to a Kafka message using either Message<?> or ProducerRecord<String, String> like shown in the following code. We configure the KafkaTemplate inside the SenderConfig class. For simplicity we used a StringSerializer for both key and value fields.
You can modify Kafka topic properties by navigating to the Topics page, and editing content in the Configs tab of the topic Profile.
Kafka v0.11.0.0 adds support for custom headers.
You can add them when creating a ProducerRecord like this:
new ProducerRecord(key, value, headers, ...), where headers is of type Iterable<Header>
For more details see:
https://issues.apache.org/jira/browse/KAFKA-4208
https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers
Record level headers were introduced from Kafka 0.11.0. We can send a list of Headers in each record.
List<Header> headers = Arrays.asList(new RecordHeader("header_key", "header_value".getBytes())); ProducerRecord<String, String> record = new ProducerRecord<>("topic", null, "key", "value", headers);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With