Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to view kafka headers

We are sending message with headers to Kafka using org.apache.kafka.clients.producer.ProducerRecord

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
    this(topic, partition, (Long)null, key, value, headers);
}

How can I actually see these headers using command. kafka-console-consumer.sh only shows me payload and no headers.

like image 816
Sammy Pawar Avatar asked Mar 15 '19 10:03

Sammy Pawar


People also ask

Does Kafka message have headers?

A Header is a key-value pair, and multiple headers can be included with the key, value, and timestamp in each Kafka message. If the value contains schema information, then the header will have a non-null schema . This is an immutable interface.

How do I pass the header of a Kafka message?

headers(). add(new RecordHeader("key","value1". getBytes()))-This is the key and value of headers data that we are sending to kafka. To verify your data you can check the topic in the kafka control center and verify the headers sent.

How do I find my Kafka message ID?

Unlike typical messaging systems, a message stored in Kafka doesn't have an explicit message id. Instead, each message is addressed by its logical offset in the log.


5 Answers

You can use the excellent kafkacat tool.

Sample command:

kafkacat -b kafka-broker:9092 -t my_topic_name -C \
  -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

Sample output:

Key (-1 bytes):
  Value (13 bytes): {foo:"bar 5"}
  Timestamp: 1548350164096
  Partition: 0
  Offset: 34
  Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
 due to serialization error:

The kafkacat header option is only available in recent builds of kafkacat; you may want to build from master branch yourself if your current version doesn't include it.


You can also run kafkacat from Docker:

docker run --rm edenhill/kafkacat:1.5.0 \
      -b kafka-broker:9092 \
      -t my_topic_name -C \
      -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

If you use Docker bear in mind the network implications of how to reach the Kafka broker.

like image 99
Robin Moffatt Avatar answered Oct 23 '22 12:10

Robin Moffatt


Starting with kafka-2.7.0 you can enable printing headers in console-consumer by providing property print.headers=true

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic quickstart-events --property print.key=true --property print.headers=true --property print.timestamp=true

like image 21
Aliaksei Avatar answered Oct 23 '22 14:10

Aliaksei


You can also use kafkactl for this. E.g. with output as yaml:

kafkactl consume my-topic --print-headers -o yaml

Sample output:

partition: 1
offset: 22
headers:
  key1: value1
  key2: value2
value: my-value

Disclaimer: I am contributor to this project

like image 11
D-rk Avatar answered Oct 23 '22 12:10

D-rk


From kafka-console-consumer.sh script:

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

src: https://github.com/apache/kafka/blob/2.1.1/bin/kafka-console-consumer.sh

In kafka.tools.ConsoleConsumer the header is provided to the Formatter, but none of the existing Formatters makes use of it:

formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
                                     msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers),
                                     output)

src: https://github.com/apache/kafka/blob/2.1.1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala

At the bottom of the above link you can see existing Formatters.

If you want to print headers you need to implement your own kafka.common.MessageFormatter and in particular its write method:

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit

and then run your console consumer with --formatter providing your own formatter (it should also be present on the classpath).

Another, simpler and faster way, would be to implement your own mini-program using KafkaConsumer and check headers in debug.

like image 7
tgrez Avatar answered Oct 23 '22 14:10

tgrez


kcat -C -b $brokers -t $topic -f 'key: %k Headers: %h: Message value: %s\n'
like image 1
Prosenjit Bari Avatar answered Oct 23 '22 14:10

Prosenjit Bari