Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to access Kafka headers while consuming a message?

Below are my configuration

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
            kafka-consumer-context-ref="consumerContext"
            auto-startup="true"
            channel="inputFromKafka">
        <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>

inputFromKafka goes through transformation below

public Message<?> transform(final Message<?> message) {

System.out.println( "KAFKA Message Headers " + message.getHeaders());

final Map<String, Map<Integer, List<Object>>> origData =  (Map<String, Map<Integer, List<Object>>>) message.getPayload();

        // some code to figure-out the nonPartitionedData
        return MessageBuilder.withPayload(nonPartitionedData).build();
    }

The print statement from above prints only two consistent headers regardless

KAFKA Message Headers {id=9c8f09e6-4b28-5aa1-c74c-ebfa53c01ae4, timestamp=1437066957272}

While Sending a Kafka message some headers were passed including KafkaHeaders.MESSAGE_KEY but I am not getting back that either, wondering if there is away to accomplish this?

like image 318
iamiddy Avatar asked Jul 16 '15 17:07

iamiddy


1 Answers

Unfortunately it doesn't work that way...

The Producer part (KafkaProducerMessageHandler) looks like this:

this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());

As you see we don't send any messageHeaders to the Kafka topic. Only payload and exactly under that messageKey as it specified by Kafka protocol.

From other side the Consumer side (KafkaHighLevelConsumerMessageSource) does this logic:

if (!payloadMap.containsKey(messageAndMetadata.partition())) {
    final List<Object> payload = new ArrayList<Object>();
    payload.add(messageAndMetadata.message());
    payloadMap.put(messageAndMetadata.partition(), payload);
}

As you see we don't care here about messageKey.

The KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) is for you! It does this before sending the message to the channel:

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(generateMessageId, generateTimestamp);

Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
rawHeaders.put(KafkaHeaders.MESSAGE_KEY, key);
rawHeaders.put(KafkaHeaders.TOPIC, metadata.getPartition().getTopic());
rawHeaders.put(KafkaHeaders.PARTITION_ID, metadata.getPartition().getId());
rawHeaders.put(KafkaHeaders.OFFSET, metadata.getOffset());
rawHeaders.put(KafkaHeaders.NEXT_OFFSET, metadata.getNextOffset());

if (!this.autoCommitOffset) {
    rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
}
like image 186
Artem Bilan Avatar answered Oct 23 '22 06:10

Artem Bilan