I am using kafka_2.12 version 2.3.0 where I am publishing data into kafka topic using partition and key. I need to find a way using which I can consume a particular message from topic using key and partition combination. That way I won't have to consume all the messages and iterate for the correct one.
Right now I am only able to do this
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singletonList("topic"))
ConsumerRecords<String, String> records = consumer.poll(100)
def data = records.findAll {
it -> it.key().equals(key)
}
You can't "get messages by key from Kafka".
One solution, if practical, would be to have as many partitions as keys and always route messages for a key to the same partition.
Message Key as Partition
kafkaConsumer.assign(topicPartitions);
kafkaConsumer.seekToBeginning(topicPartitions);
// Pull records from kafka, keep polling until we get nothing back
final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>();
ConsumerRecords<byte[], byte[]> records;
do {
// Grab records from kafka
records = kafkaConsumer.poll(2000L);
logger.info("Found {} records in kafka", records.count());
// Add to our array list
records.forEach(allRecords::add);
}
while (!records.isEmpty());
Access messages of a Topic using Topic Name only
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(<Topic Name>,<Topic Name>));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With