Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding custom header using Spring Kafka

I am planning to use the Spring Kafka client to consume and produce messages from a kafka setup in a Spring Boot application. I see support for custom headers in Kafka 0.11 as detailed here. While it is available for native Kafka producers and consumers, I don't see support for adding/reading custom headers in Spring Kafka.

I am trying to implement a DLQ for messages based on a retry count that I was hoping to store in the message header without having to parse the payload.

like image 906
Annu Avatar asked Feb 15 '18 00:02

Annu


2 Answers

Well, Spring Kafka provides headers support since version 2.0: https://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html/_reference.html#headers

You can have that KafkaHeaderMapper instance and use it to populated headers to the Message before sending it via KafkaTemplate.send(Message<?> message). Or you can use the plain KafkaTemplate.send(ProducerRecord<K, V> record).

When you receive records using KafkaMessageListenerContainer, the KafkaHeaderMapper can be supplied there via a MessagingMessageConverter injected to the RecordMessagingMessageListenerAdapter.

So, any custom headers can be transferred either way.

like image 101
Artem Bilan Avatar answered Oct 19 '22 09:10

Artem Bilan


I was looking for an answer when I stumbled upon this question. However I'm using the ProducerRecord<?, ?> class instead of Message<?>, so the header mapper does not seem to be relevant.

Here is my approach to add a custom header:

var record = new ProducerRecord<String, String>(topicName, "Hello World");
record.headers().add("foo", "bar".getBytes());
kafkaTemplate.send(record);

Now to read the headers (before consuming), I've added a custom interceptor.

import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {

    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
        Set<TopicPartition> partitions = records.partitions();
        partitions.forEach(partition -> interceptRecordsFromPartition(records.records(partition)));

        return records;
    }

    private void interceptRecordsFromPartition(List<ConsumerRecord<Object, Object>> records) {
        records.forEach(record -> {
            var myHeaders = new ArrayList<Header>();
            record.headers().headers("MyHeader").forEach(myHeaders::add);
            log.info("My Headers: {}", myHeaders);
            // Do with header as you see fit
        });
    }

    @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
    @Override public void close() {}
    @Override public void configure(Map<String, ?> configs) {}
}

The final bit is to register this interceptor with the Kafka Consumer Container with the following (Spring Boot) configuration:

import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@Configuration
public class MessagingConfiguration {

    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
        Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

}
like image 42
BitfullByte Avatar answered Oct 19 '22 09:10

BitfullByte