Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Consumer get key value pair

I'm currently working with Kafka and Flink, I have kafka running in my local PC and I created a topic that is being consumed.

Desktop\kafka\bin\windows>kafka-console-consumer.bat --zookeeper localhost:2181 -topic test

but it is just retrieving the message, enter image description here

is there a way to get futher details about the message ? lets say time? key? I checked kafka documentation but I didn't find something about this topic

like image 874
André De La O Campos Avatar asked Dec 30 '16 17:12

André De La O Campos


People also ask

What is key-value pair in Kafka?

A key-value pair defined for a single Kafka Streams record. If the record comes directly from a Kafka topic then its key/value are defined as the message key/value.

How do I find my consumer group ID Kafka?

You can get the value of group.id for your kafka cluster by looking into $KAFKA_HOME/config/consumer. properties . There you can see the line #consumer group id . Use this value and your code will work.

How do you get a list of consumers connected to a Kafka topic?

Get the list of consumer groups for a topic. Use kafka-consumer-groups.sh to list all consumer groups. Note that the below command will list all the consumer groups for all topics managed by the cluster.


1 Answers

Using out of the box console consumer (I am using Kafka 0.9.0.1) you can only print the key and the value of messages using different formats. To print the key, set the property print.key=true.

There is another property key.separator that by default is "\t" (a tab) that you can also change to anything you want.

To set these properties you can create a config file and use --consumer.config <config file> or pass the properties using --property key=value.

You can also implement your own formatter and use it with --formatter option but you will still have just the key and value because that is what the MessageFormatter trait provides (see writeTo below).

trait MessageFormatter {     def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)      def init(props: Properties) {}      def close() {} } 

For example:

./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server kafka-1:9092 --topic topic1 --property print.key=true --property key.separator="-" --from-beginning key-p1 key-p2 key-p3 null-4 null-8 null-0 
like image 197
Luciano Afranllie Avatar answered Sep 16 '22 18:09

Luciano Afranllie