Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

testing kafka consumer and producer failed on connection

I have been trying to test a kafka installation and using the guide created a producer and consumer. When trying to retrieve a message I get the following error:

 WARN Session 0x0 for server null, unexpected error, closing socket connection and 
 attempting reconnect (org.apache.zookeeper.ClientCnxn)
 java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)
 [2014-03-04 18:01:20,628] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
 [2014-03-04 18:01:21,315] INFO Opening socket connection to server kafka-test/192.xxxxxx.110:2182 (org.apache.zookeeper.ClientCnxn)
 [2014-03-04 18:01:21,418] INFO Session: 0x0 closed (org.apache.zookeeper.ZooKeeper)
 Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000
    at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880)
    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
    at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:151)
    at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:112)
    at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:123)
    at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
    at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:178)
    at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
 [2014-03-04 18:01:21,419] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn)
like image 323
vbNewbie Avatar asked Mar 06 '14 16:03

vbNewbie


People also ask

How do I verify if Kafka consumer consumed messages?

Approach can be as follow: After consuming each chunk application should produce message with status (Consumed, and chunk number) Second application (Kafka Streams once) should aggregate result and, when process messages with all chunks produce final message, that file is processed.

How do I know if Kafka consumer is running?

You can use consumer. assignment() , it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.

How does Kafka deal with consumer failure?

ERROR HANDLING IN CONSUMER Consumer is trying to consume data from Kafka topic but the connection to broker is not established because broker is not available. In that case, consumer should retry to consume data within some time intervals. Kafka records are stored in the topics.


2 Answers

Check your zookeeper connection with telnet command:

telnet 192.xxxxxx.110 2181

You probably get an error, in which case check that the process is running:

ps -ef | grep "zookeeper.properties"

If it's not running, start it by going into kafka home directory:

bin/zookeeper-server-start.sh config/zookeeper.properties &

like image 90
nafooesi Avatar answered Oct 14 '22 03:10

nafooesi


Kafka

Looks like you're not connecting to Zookeeper correctly. I'm not sure of your setup (multi-machine, VMs, containers) so it's hard to say what's wrong. From the debug output I see the following line hinting at your expected Zookeeper IP:

[2014-03-04 18:01:21,315] INFO Opening socket connection to server kafka-test/192.xxxxxx.110:2182 (org.apache.zookeeper.ClientCnxn)

Kafka looks for Zookeeper at the address specified by the zookeeper.connect configuration property in the $KAFKA_HOME/config/server.properties file. Be sure to edit that before starting Kafka. Also, try giving the actual public IP of your Zookeeper instance, not just 127.0.0.1 as that solves a lot of confusion if you're running in containers. In your case it looks like it would be: zookeeper.connect=192.xxxxxx.110:2182

Also relevant to the Kafka config if you're running on AWS or operating in a container, don't forget to update the following two configuration properties to make sure clients who connect to Kafka see the correct public IP

  • advertised.host.name
  • advertised.port

and Kafka sees the correct internal IP

  • host.name
  • port

Zookeeper

Zookeeper has some gotchas when setting it up as well. On your Zookeeper instance, don't forget to edit the server configuration property in the zoo.cfg (usually in /etc/zookeeper/conf) file to point to the correct IP for your Zookeeper instance. In your case probably the following:

server.1=192.xxxxxx.110:2888:3888

Those last two ports (2888 3888) are only needed if you're running a Zookeeper cluster (for followers to connect to the leader and Zookeeper leader election, respectively, so be sure to unblock them on firewallish things if you have multiple Zookeeper servers).

like image 40
Joel B Avatar answered Oct 14 '22 04:10

Joel B