Below are my configuration
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
kafka-consumer-context-ref="consumerContext"
auto-startup="true"
channel="inputFromKafka">
<int:poller fixed-delay="1" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>
inputFromKafka goes through transformation below
public Message<?> transform(final Message<?> message) {
System.out.println( "KAFKA Message Headers " + message.getHeaders());
final Map<String, Map<Integer, List<Object>>> origData = (Map<String, Map<Integer, List<Object>>>) message.getPayload();
// some code to figure-out the nonPartitionedData
return MessageBuilder.withPayload(nonPartitionedData).build();
}
The print statement from above prints only two consistent headers regardless
KAFKA Message Headers {id=9c8f09e6-4b28-5aa1-c74c-ebfa53c01ae4, timestamp=1437066957272}
While Sending a Kafka message some headers were passed including KafkaHeaders.MESSAGE_KEY but I am not getting back that either, wondering if there is away to accomplish this?
Unfortunately it doesn't work that way...
The Producer part (KafkaProducerMessageHandler) looks like this:
this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());
As you see we don't send any messageHeaders to the Kafka topic. Only payload and exactly under that messageKey as it specified by Kafka protocol.
From other side the Consumer side (KafkaHighLevelConsumerMessageSource) does this logic:
if (!payloadMap.containsKey(messageAndMetadata.partition())) {
final List<Object> payload = new ArrayList<Object>();
payload.add(messageAndMetadata.message());
payloadMap.put(messageAndMetadata.partition(), payload);
}
As you see we don't care here about messageKey.
The KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) is for you! It does this before sending the message to the channel:
KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(generateMessageId, generateTimestamp);
Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
rawHeaders.put(KafkaHeaders.MESSAGE_KEY, key);
rawHeaders.put(KafkaHeaders.TOPIC, metadata.getPartition().getTopic());
rawHeaders.put(KafkaHeaders.PARTITION_ID, metadata.getPartition().getId());
rawHeaders.put(KafkaHeaders.OFFSET, metadata.getOffset());
rawHeaders.put(KafkaHeaders.NEXT_OFFSET, metadata.getNextOffset());
if (!this.autoCommitOffset) {
rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With