Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka consumer to dynamically detect topics added

I'm using KafkaConsumer to consume messages from Kafka server (topics)..

  • It works fine for topics created before starting Consumer code...

But the problem is, it will not work if the topics created dynamically(i mean to say after consumer code started), but the API says it will support dynamic topic creation.. Here is the link for your reference..

Kafka version used : 0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Here is the JAVA code...

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

NOTE: My topic names are matching the Regular Expression.. And if i restart the consumer then it will start reading messages pushed to topic...

Any help is really appreciated...

like image 833
siddu Avatar asked Mar 22 '16 11:03

siddu


People also ask

How do you pass topics dynamically to a Kafka listener?

You cannot "dynamically pass topics to Kafka listener "; you have to programmatically create a listener container instead.

Can a Kafka consumer read from multiple topics?

Yes, Kafka's design allows consumers from one consumer group to consume messages from multiple topics.

Does Kafka create topics automatically?

By default, Apache Kafka on HDInsight doesn't enable automatic topic creation. You can enable auto topic creation for existing clusters using Apache Ambari. You can also enable auto topic creation when creating a new Kafka cluster using an Azure Resource Manager template.

Can a Kafka consumer filter messages before polling all of them from a topic?

Kafka doesn't support filtering ability for consumers. If a consumer needs to listen to a sub-set of messages published on to a Kafka topic, consumer has to read all & filter only what is needed. This is in-efficient as all the messages are to be deserialized & make such a decision.


1 Answers

There was an answer to this in the apache kafka mail archives. I am copying it below:

The consumer supports a configuration option "metadata.max.age.ms" which basically controls how often topic metadata is fetched. By default, this is set fairly high (5 minutes), which means it will take up to 5 minutes to discover new topics matching your regular expression. You can set this lower to discover topics quicker.

So in your props you can:

props.put("metadata.max.age.ms", 5000);

This will cause your consumer to find out about new topics every 5 seconds.

like image 61
bhspencer Avatar answered Sep 29 '22 11:09

bhspencer