I have a Kafka Streams application consuming from and producing to a Kafka cluster with 3 brokers and a replication factor of 3. Other than the consumer offset topics (50 partitions), all other topics have only one partition each.
When the brokers attempt a preferred replica election, the Streams app (which is running on a completely different instance than the brokers) fails with the error:
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
...
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
Is it normal that the Streams app attempts to be the leader for the partition, given that it's running on a server that's not part of the Kafka cluster?
I can reproduce this behaviour on demand by:
bin/kafka-preferred-replica-election.sh --zookeeper localhost
My issue seems to be similar to this reported failure, so I'm wondering if this is a new Kafka Streams bug. My full stack trace is literally exactly the same as the gist linked in the reported failure (here).
Another potentially interesting detail is that during the leader election, I get these messages in the controller.log
of the broker:
[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
I initially thought this connection error was to blame, but after the leader election crashes the Streams app, if I restart the Streams app, it works normally until the next election, without me touching the brokers at all.
All servers (3 Kafka brokers and the Streams app) are running on EC2 instances.
Once a leader broker shuts down, the "leader" status goes over to another broker which is in sync, that means a broker that has the current state replicated and is not behind.
Whenever a new topic is created, Kafka runs it's leader election algorithm to figure out the preferred leader of a partition. The first replica will be the one that will be elected as a leader from the list of replicas.
If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.
In Kafka, an unclean leader election occurs when an unclean broker (“unclean” because it has not finished replicating the latest data updates from the previous leader) becomes the new leader.
This is now fixed in 0.10.2.1. If you can't pick that up, make sure you have these two parameters set as follows in your streams config:
final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With