I am planning to use the Spring Kafka client to consume and produce messages from a kafka setup in a Spring Boot application. I see support for custom headers in Kafka 0.11 as detailed here. While it is available for native Kafka producers and consumers, I don't see support for adding/reading custom headers in Spring Kafka.
I am trying to implement a DLQ for messages based on a retry count that I was hoping to store in the message header without having to parse the payload.
Well, Spring Kafka provides headers support since version 2.0: https://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html/_reference.html#headers
You can have that KafkaHeaderMapper
instance and use it to populated headers to the Message
before sending it via KafkaTemplate.send(Message<?> message)
. Or you can use the plain KafkaTemplate.send(ProducerRecord<K, V> record)
.
When you receive records using KafkaMessageListenerContainer
, the KafkaHeaderMapper
can be supplied there via a MessagingMessageConverter
injected to the RecordMessagingMessageListenerAdapter
.
So, any custom headers can be transferred either way.
I was looking for an answer when I stumbled upon this question. However I'm using the ProducerRecord<?, ?>
class instead of Message<?>
, so the header mapper does not seem to be relevant.
Here is my approach to add a custom header:
var record = new ProducerRecord<String, String>(topicName, "Hello World");
record.headers().add("foo", "bar".getBytes());
kafkaTemplate.send(record);
Now to read the headers (before consuming), I've added a custom interceptor.
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
@Override
public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
Set<TopicPartition> partitions = records.partitions();
partitions.forEach(partition -> interceptRecordsFromPartition(records.records(partition)));
return records;
}
private void interceptRecordsFromPartition(List<ConsumerRecord<Object, Object>> records) {
records.forEach(record -> {
var myHeaders = new ArrayList<Header>();
record.headers().headers("MyHeader").forEach(myHeaders::add);
log.info("My Headers: {}", myHeaders);
// Do with header as you see fit
});
}
@Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
The final bit is to register this interceptor with the Kafka Consumer Container with the following (Spring Boot) configuration:
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Configuration
public class MessagingConfiguration {
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With