I am able to get a simple one-node Kafka (kafka_2.11-0.8.2.1) working locally on one linux machine, but when I try to run a producer remotely I'm getting some confusing errors.
I'm following the quickstart guide at http://kafka.apache.org/documentation.html#quickstart. I stopped the kafka processes and deleted all the zookeeper & karma files in /tmp. I am on a local 10.0.0.0/24 network NAT-ed with an external IP address, so I modified server.properties
to tell zookeeper how to broadcast my external address, as per https://medium.com/@thedude_rog/running-kafka-in-a-hybrid-cloud-environment-17a8f3cfc284:
advertised.host.name=MY.EXTERNAL.IP
Then I'm running this:
$ bin/zookeeper-server-start.sh config/zookeeper.properties --> ... $ export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" # small test server! $ bin/kafka-server-start.sh config/server.properties --> ...
I opened up the firewall for my producer on the remote machine, and created a new topic and verified it:
$ bin/kafka-topics.sh --create --zookeeper MY.EXTERNAL.IP:2181 --replication-factor 1 --partitions 1 --topic test123 --> Created topic "test123". $ bin/kafka-topics.sh --list --zookeeper MY.EXTERNAL.IP:2181 --> test123
However, the producer I'm running remotely gives me errors:
$ bin/kafka-console-producer.sh --broker-list MY.EXTERNAL.IP:9092 --topic test123 --> [2015-06-16 14:41:19,757] WARN Property topic is not valid (kafka.utils.VerifiableProperties) My Test Message --> [2015-06-16 14:42:43,347] WARN Error while fetching metadata [{TopicMetadata for topic test123 ->
No partition metadata for topic test123 due to kafka.common.LeaderNotAvailableException}] for topic [test123]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) --> (repeated several times)
(I disabled the whole firewall to make sure that wasn't the problem.)
The stdout errors in the karma-startup are repeated: [2015-06-16 20:42:42,768] INFO Closing socket connection to /MY.EXTERNAL.IP. (kafka.network.Processor)
And the controller.log gives me this, several times:
java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-06-16 20:44:08,128] INFO [Controller-0-to-broker-0-send-thread], Controller 0 connected to id:0,host:MY.EXTERNAL.IP,port:9092 for sending state change requests (kafka.controller.RequestSendThread) [2015-06-16 20:44:08,428] WARN [Controller-0-to-broker-0-send-thread], Controller 0 epoch 1 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:1;CorrelationId:7;ClientId:id_0-host_null-port_9092;Leaders:id:0,host:MY.EXTERNAL.IP,port:9092;PartitionState:(test123,0) -> (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:MY.EXTERNAL.IP,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread)
Running this seems to indicate that there is a leader at 0:
$ ./bin/kafka-topics.sh --zookeeper MY.EXTERNAL.IP:2181 --describe --topic test123 --> Topic:test123 PartitionCount:1 ReplicationFactor:1 Configs: Topic: test123 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
I reran this test and my server.log indicates that there is a leader at 0:
... [2015-06-16 21:58:04,498] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2015-06-16 21:58:04,642] INFO Registered broker 0 at path /brokers/ids/0 with address MY.EXTERNAL.IP:9092. (kafka.utils.ZkUtils$) [2015-06-16 21:58:04,670] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2015-06-16 21:58:04,736] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
I see this error in the logs when I send a message from the producer:
[2015-06-16 22:18:24,584] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 7; ClientId: console-producer; Topics: test123 (kafka.server.KafkaApis) kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
I assume this means that the broker can't be found for some reason? I'm confused what this means...
Setting Advertised Host as localhost If you set the Advertised Host as localhost, the Kafka client will successfully connect to the cluster if it is running on the same machine as the broker. Connecting to Localhost.
LISTENERS are what interfaces Kafka binds to. ADVERTISED_LISTENERS are how clients can connect.
"replication": A replicated partition contains exactly the same data of the leader. So the same message is stored multiple times. This ensures durability as the same message is located on different brokers. In case of a broker failure, Kafka can switch the leader and provide the replicated message to its clients.
For the recent versions of Kafka (0.10.0 as of this writing), you don't want to use advertised.host.name
at all. In fact, even the [documentation] states that advertised.host.name
is already deprecated. Moreover, Kafka will use this not only as the "advertised" host name for the producers/consumers, but for other brokers as well (in a multi-broker environment)...which is kind of a pain if you're using using a different (perhaps internal) DNS for the brokers...and you really don't want to get into the business of adding entries to the individual /etc/hosts
of the brokers (ew!)
So, basically, you would want the brokers to use the internal name, but use the external FQDNs for the producers and consumers only. To do this, you will update advertised.listeners
instead.
Set advertised.host.name to a host name, not an IP address. The default is to return a FQDN using getCanonicalHostName(), but this is only best effort and falls back to an IP. See the java docs for getCanonicalHostName().
The trick is to get that host name to always resolve to the correct IP. For small environments I usually setup all of the hosts with all of their internal IPs in /etc/hosts. This way all machines know how to talk to each other over the internal network, by name. In fact, configure your Kafka clients by name now too, not by IP. If managing all the /etc/hosts files is a burden then setup an internal DNS server to centralize it, but internal DNS should return internal IPs. Either of these options should be less work than having IP addresses scattered throughout various configuration files on various machines.
Once everything is communicating by name all that's left is to configure external DNS with the external IPs and everything just works. This includes configuring Kafka clients with the server names, not IPs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With