Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Receiving Kafka Key in spring boot kafka listener

I am new in Spring Kafka. I have an microservice which sends message with a kafka key which is an user defined object.

1) First microservice sends message to Kafka with a key which is instance of MyKey object.

2) What I need to do is, to listen that topic and get this message with the key, and create a new key by using that Key.

Lets say that the message is send by the key which is myKey. And what I want to do in the listener is to create a new extended key as:

     @KafkaListener(groupId = Bindings.CONSUMER_GROUP_DATA_CLEANUP, topics = "users")
     public void process( @Payload MyMessage myMessage){

        MyExtended myExtendedKey= new MyExtendedKey(myKey.getX(), myKey.getY());
        ....
        ....
        kafkaTemplate.send(TOPIC,  myExtendedKey, message);
      }

I do not know how can I get the key of the message which is sent in the listener.

like image 574
user1474111 Avatar asked Nov 24 '25 12:11

user1474111


2 Answers

Please read the documentation.

...

Finally, metadata about the message is available from message headers. You can use the following header names to retrieve the headers of the message:

KafkaHeaders.RECEIVED_MESSAGE_KEY (now .RECEIVED_KEY)

KafkaHeaders.RECEIVED_TOPIC

KafkaHeaders.RECEIVED_PARTITION_ID (now .RECEIVED_PARTITION)

KafkaHeaders.RECEIVED_TIMESTAMP

KafkaHeaders.TIMESTAMP_TYPE

The following example shows how to use the headers:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}

The offset is also available.

EDIT

Some headers have been renamed since 2.9; see above.

like image 54
Gary Russell Avatar answered Nov 28 '25 17:11

Gary Russell


The easiest way to get the key, value, and metadata for a Message using @KafkaListener is by using a ConsumerRecord in your KafkaListener function instead receive only the payload as a value record.

@KafkaListener(topics = "any-topic")
void listener(ConsumerRecord<String, String> record) {
    System.out.println(record.key());
    System.out.println(record.value());
    System.out.println(record.partition());
    System.out.println(record.topic());
    System.out.println(record.offset());
}

Does not have beautiful annotations, but it works, Also if you want to receive records from a Kafka topic, process those records, and send them again to another Kafka topic, I would recommend you take a look at Kafka Streams API.

like image 20
Felipe Tapia Avatar answered Nov 28 '25 16:11

Felipe Tapia