I have a Kafka consumer. It seems to work for a while, and then die. It does this repeatedly. I get this exception but no other information.
org.apache.kafka.common.errors.TimeoutException:
Failed to get offsets by times in 305000 ms
305000 ms is 5 minutes. Is there any clue about what might cause this? Or steps to try and find out?
In case it's relevant:
I have 3 processes on different machines, using the latest Java Kafka Client version 0.10.2.0. Each machine is running 20 threads, each thread has a separate Consumer. By design, when one thread dies, all threads are killed and the process dies, and are restarted. This leads to ~20 consumers simultaneously dying and restarting, which will lead to a rebalance. So it's possible that this may cause a periodic interference between the clients. This doesn't explain why I get this exception in the first place, however.
I have three Kafka machines and three Zookeeper machines. Each client has all 3 Kafka machines in its bootstrap.servers
configuration. The topic has 200 partitions, meaning that each thread is assigned approx 3 partitions. The topic has a replication factor of 2.
There are no errors in the Kafka or Zookeeper logs.
The following config values are set, no others.
I ran into this today. I saw two different versions of this error message, depending on if I was using the Kafka 1.0 client libraries or the Kafka 2.0 client libraries. Error message was "org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 305000 ms"
for Kafka 1.0 client and "org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 30003ms"
for the 2.0 client library.
I received this message when trying to monitor offsets/lag with the kafka-console-consumer command (e.g. kafka-consumer-groups --bootstrap-server {servers} --group {group} --describe
) command. These commands are part of the kafka/confluent tools but I imagine this could happen to other clients.
The problem seemed to be that I had a topic with a replication factor of 1 that had partitions without an assigned leader. The only way I found this was by updating the {kafka_client_dir}\libexec\config\tools-log4j.properties
file to log at the DEBUG level : log4j.rootLogger=DEBUG, stderr
Note that this is the log4j config file for the kafka/confluent tools - YMMV for other clients. I am running them from my Mac.
When this was done, I saw the following message in the output, which alerted me to the ISR/offlineReplicas issue:
[2019-01-28 11:41:54,290] DEBUG Updated cluster metadata version 2 to Cluster(id = 0B1zi_bbQVyrfKwqiDa2kw,
nodes = [
brokerServer3:9092 (id: 3 rack: null),
brokerServer6:9092 (id: 6 rack: null),
brokerServer1:9092 (id: 1 rack: null),
brokerServer5:9092 (id: 5 rack: null),
brokerServer4:9092 (id: 4 rack: null)], partitions = [
Partition(topic = myTopicWithReplicatinFactorOne, partition = 10, leader = 6, replicas = [6], isr = [6], offlineReplicas = []),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 11, leader = 1, replicas = [1], isr = [1], offlineReplicas = []),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 12, leader = none, replicas = [2], isr = [], offlineReplicas = [2]),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 13, leader = 3, replicas = [3], isr = [3], offlineReplicas = []),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 14, leader = 4, replicas = [4], isr = [4], offlineReplicas = []),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 2, leader = 4, replicas = [4], isr = [4], offlineReplicas = []),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 3, leader = 5, replicas = [5], isr = [5], offlineReplicas = []),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 4, leader = 6, replicas = [6], isr = [6], offlineReplicas = []),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 5, leader = 1, replicas = [1], isr = [1], offlineReplicas = []),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 6, leader = none, replicas = [2], isr = [], offlineReplicas = [2]),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 7, leader = 3, replicas = [3], isr = [3], offlineReplicas = []),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 8, leader = 4, replicas = [4], isr = [4], offlineReplicas = []),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 9, leader = 5, replicas = [5], isr = [5], offlineReplicas = []),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 0, leader = none, replicas = [2], isr = [], offlineReplicas = [2]),
Partition(topic = myTopicWithReplicatinFactorOne, partition = 1, leader = 3, replicas = [3], isr = [3], offlineReplicas = [])
], controller = brokerServer4:9092 (id: 4 rack: null)) (org.apache.kafka.clients.Metadata)
You can see above where it says offlineReplicas = [2]
- hinting at the issue. Also brokerServer2
was not in the list of brokers.
Ultimately, I restarted the affected broker (brokerServer2
) to get it back in sync and once this was done, I had no issues using the command line tools again. There are probably better ways to fix this than to do a broker restart but it ultimately fixed the issue
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With