Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Consumer: No entry found for connection

I am trying to check the kafka consumer by consuming the data from a topic on a remote Kafka cluster. I am getting the following error when I use the kafka-console-consumer.sh:

 ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
java.lang.IllegalStateException: No entry found for connection 2147475658
    at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
    at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:885)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:276)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:655)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:635)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
    at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:436)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages

Here is the command that I use:

./bin/kafka-console-consumer.sh --bootstrap-server SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT} --consumer.config ./config/consumer.properties --topic MYTOPIC --group MYGROUP

Here is the ./config/consumer.properties file:

bootstrap.servers=SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT}

# consumer group id
group.id=MYGROUP

# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
auto.offset.reset=earliest

#### Security
security.protocol=SSL
ssl.key.password=test1234
ssl.keystore.location=/opt/kafka/config/certs/keystore.jks
ssl.keystore.password=test1234
ssl.truststore.location=/opt/kafka/config/certs/truststore.jks
ssl.truststore.password=test1234

Do you have any idea what the problem is?

like image 547
Nooshin Avatar asked Jan 30 '19 15:01

Nooshin


2 Answers

I have found the problem. It was a DNS problem at the end. I was reaching out the Kafka brokers by the IP addresses, but the broker replies with DNS name. After setting the DNS names on the consumer side, it started working again.

like image 162
Nooshin Avatar answered Oct 27 '22 07:10

Nooshin


I had this problem (with consumers and producers) when running Kafka and Zookeeper as Docker containers.

The solution was to set advertised.listeners in the config/server.properties file of the Kafka brokers, so that it contains the IP address of the container, e.g.

advertised.listeners=PLAINTEXT://172.15.0.8:9092

See https://github.com/maxant/kafkaplayground/blob/master/start-kafka.sh for an example of a script used to start Kafka inside the container after setting up the properties file correctly.

like image 20
Ant Kutschera Avatar answered Oct 27 '22 06:10

Ant Kutschera