Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka leader election causes Kafka Streams crash

I have a Kafka Streams application consuming from and producing to a Kafka cluster with 3 brokers and a replication factor of 3. Other than the consumer offset topics (50 partitions), all other topics have only one partition each.

When the brokers attempt a preferred replica election, the Streams app (which is running on a completely different instance than the brokers) fails with the error:

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
    ...
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Is it normal that the Streams app attempts to be the leader for the partition, given that it's running on a server that's not part of the Kafka cluster?

I can reproduce this behaviour on demand by:

  1. Killing one of the brokers (whereupon the other 2 take over as leader for all partitions that had the killed broker as their leader, as expected)
  2. Bringing the killed broker back up
  3. Triggering a preferred replica leader election with bin/kafka-preferred-replica-election.sh --zookeeper localhost

My issue seems to be similar to this reported failure, so I'm wondering if this is a new Kafka Streams bug. My full stack trace is literally exactly the same as the gist linked in the reported failure (here).

Another potentially interesting detail is that during the leader election, I get these messages in the controller.log of the broker:

[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

I initially thought this connection error was to blame, but after the leader election crashes the Streams app, if I restart the Streams app, it works normally until the next election, without me touching the brokers at all.

All servers (3 Kafka brokers and the Streams app) are running on EC2 instances.

like image 527
Bogdan Avatar asked Apr 12 '17 20:04

Bogdan


People also ask

What happens when Kafka leader fails?

Once a leader broker shuts down, the "leader" status goes over to another broker which is in sync, that means a broker that has the current state replicated and is not behind.

How leader election happens in Kafka?

Whenever a new topic is created, Kafka runs it's leader election algorithm to figure out the preferred leader of a partition. The first replica will be the one that will be elected as a leader from the list of replicas.

What happens when a leader broker of a partition fails?

If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.

What is unclean leader election Kafka?

In Kafka, an unclean leader election occurs when an unclean broker (“unclean” because it has not finished replicating the latest data updates from the previous leader) becomes the new leader.


1 Answers

This is now fixed in 0.10.2.1. If you can't pick that up, make sure you have these two parameters set as follows in your streams config:

final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);  
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
like image 191
Eno Thereska Avatar answered Oct 19 '22 21:10

Eno Thereska