I am new in Spring Kafka. I have an microservice which sends message with a kafka key which is an user defined object.
1) First microservice sends message to Kafka with a key which is instance of MyKey object.
2) What I need to do is, to listen that topic and get this message with the key, and create a new key by using that Key.
Lets say that the message is send by the key which is myKey. And what I want to do in the listener is to create a new extended key as:
@KafkaListener(groupId = Bindings.CONSUMER_GROUP_DATA_CLEANUP, topics = "users")
public void process( @Payload MyMessage myMessage){
MyExtended myExtendedKey= new MyExtendedKey(myKey.getX(), myKey.getY());
....
....
kafkaTemplate.send(TOPIC, myExtendedKey, message);
}
I do not know how can I get the key of the message which is sent in the listener.
Please read the documentation.
...
Finally, metadata about the message is available from message headers. You can use the following header names to retrieve the headers of the message:
KafkaHeaders.RECEIVED_MESSAGE_KEY (now .RECEIVED_KEY)
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID (now .RECEIVED_PARTITION)
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE
The following example shows how to use the headers:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
The offset is also available.
EDIT
Some headers have been renamed since 2.9; see above.
The easiest way to get the key, value, and metadata for a Message using @KafkaListener is by using a ConsumerRecord in your KafkaListener function instead receive only the payload as a value record.
@KafkaListener(topics = "any-topic")
void listener(ConsumerRecord<String, String> record) {
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.partition());
System.out.println(record.topic());
System.out.println(record.offset());
}
Does not have beautiful annotations, but it works, Also if you want to receive records from a Kafka topic, process those records, and send them again to another Kafka topic, I would recommend you take a look at Kafka Streams API.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With