According to the documentation on kafka javadocs if I:
A rebalance should occur, which makes the consumer read from that new topic. But that's not happening.
If I stop and start the consumer, it does pick up the new topic. So I know the new topic matches the pattern. There's a possible duplicate of this question in https://stackoverflow.com/questions/37120537/whitelist-filter-in-kafka-doesnt-pick-up-new-topics but that question got nowhere.
I'm seeing the kafka logs and there are no errors, it just doesn't trigger a rebalance. The rebalance is triggered when consumers join or die, but not when new topics are created (not even when partitions are added to existing topics, but that's another subject).
I'm using kafka 0.10.0.0, and the official Java client for the "New Consumer API", meaning broker GroupCoordinator instead of fat client + zookeeper.
This is the code for the sample consumer:
public class SampleConsumer {
public static void main(String[] args) throws IOException {
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
properties.setProperty("group.id", "my-group");
System.out.println(properties.get("group.id"));
consumer = new KafkaConsumer<>(properties);
}
Pattern pattern = Pattern.compile("mytopic.+");
consumer.subscribe(pattern, new SampleRebalanceListener());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("%s %s\n", record.topic(), record.value());
}
}
}
}
In the producer, I'm sending messages to topics named mytopic1, mytopic2, etc.
Patterns are pretty much useless if the rebalance is not triggered.
Do you know why the rebalance is not happening?
The documentation mentions "The pattern matching will be done periodically against topics existing at the time of check.". It turns out the "periodically" corresponds to the metadata.max.age.ms property. By setting that property (inside "consumer.props" in my code sample) to i.e. 5000 I can see it detects new topics and partitions every 5 seconds.
This is as designed, according to this jira ticket https://issues.apache.org/jira/browse/KAFKA-3854:
The final note on the JIRA stating that a later created topic that matches a consumer's subscription pattern would not be assigned to the consumer upon creation seems to be as designed. A repeat subscribe() to the same pattern would be needed to handle that case.
The refresh metadata polling does the "repeat subscribe()" mentioned in the ticket.
This is confusing coming from Kafka 0.8 where there was true triggering based on zookeper watches, instead of polling. IMO 0.9 is more of a downgrade for this scenario, instead of "just in time" rebalancing, this becomes either high frequency polling with overhead, or low frequency polling with long times before it reacts to new topics/partitions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With