Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can talk to Zookeeper but not to the message brokers

I'm using kafka-python to produce messages for a Kafka 2.2.1 cluster (a managed cluster instance from AWS's MSK service). I'm able to retrieve the bootstrap servers and establish a network connection to them, but no message ever gets through. Instead after each message of the Type A I immediately receive one of type B... and eventually a type C:

A [INFO]    2019-11-19T15:17:19.603Z    <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR]   2019-11-19T15:17:19.605Z    <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

What causes a broker node to accept a TCP connection from a hopeful producer, but then immediately close it again?

Edit

  • The topic already exists, and kafka-topics.sh --list displays it.

  • I have the same problem with all clients I've used: Kafka's kafka-console-producer.sh, kafka-python, confluent-kafka, and kafkacat

  • The Kafka cluster is in the same VPC as all my other machines, and its security group allows any incoming and outgoing traffic within that VPC.

  • However, it's managed by Amazon's Managed Streaming for Kafka (MSK) servive, which means I don't have fine-grained control over the server installation settings (or even know what they are). MSK just publishes the zookeeper and message broker URLs for clients to use.

  • The producer runs as an AWS Lambda function, but the problem persists when I run it on a normal EC2 instance.

  • Permissions are not the issue. I have assigned the lambda role all the AWS permissions it needs (AWS is always very explicit about which operation required which missing permission).

  • Connectivity is not the issue. I can reach the URLs of both the zookeepers and the message brokers with standard telnet. However, issuing commands to the zookeepers works, while issuing commands to the message brokers always eventually fails. Since Kafka uses a binary protocol over TCP, I'm at a loss how to debug the problem further.

Edit

As suggested, I debugged this with

./kafkacat -b $BROKERS -L -d broker

and got:

7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN

So, is this a kind of mismatch between client and broker API versions? How can I recover from this, bearing in mind that I have no control over the version or the configuration of the Kafka cluster that AWS provides?

like image 650
Kilian Foth Avatar asked Nov 19 '19 15:11

Kilian Foth


People also ask

What is difference between Kafka broker and ZooKeeper?

Kafka uses Zookeeper to manage service discovery for Kafka Brokers that form the cluster. Zookeeper sends changes of the topology to Kafka, so each node in the cluster knows when a new broker joined, a Broker died, a topic was removed or a topic was added, etc.

Do Kafka clients connect to ZooKeeper?

With kafka 0.9+ the new Consumer API was introduced. New consumers do not need connection to Zookeeper since group balancing is provided by kafka itself.

Does Kafka consumer talk to ZooKeeper?

ZooKeeper is the default storage engine, for consumer offsets, in Kafka's 0.9. 1 release. However, all information about how many messages Kafka consumer consumes by each consumer is stored in ZooKeeper. Consumers in Kafka also have their own registry as in the case of Kafka Brokers.

How do Kafka brokers communicate?

Kafka brokers communicate between themselves, usually on the internal network (e.g., Docker network, AWS VPC, etc.). To define which listener to use, specify KAFKA_INTER_BROKER_LISTENER_NAME(inter.broker.listener.name) . The host/IP used must be accessible from the broker machine to others.


Video Answer


2 Answers

I think that this is related to the TLS encryption. By default, MSK spins up a cluster that accepts both PLAINTEXT and TLS but if you are grabbing the bootstrap servers programmatically from the cluster it will only provide you with the TLS ports. If this is the case for you, try using the PLAINTEXT port 9092 instead.

To authenticate the client for TLS you need to generate a certificate: https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html and would then need to get this certificate onto your lambda and reference the certificate in your Producer configuration.

If you are able to configure your MSK cluster as PLAINTEXT only then when you grab the bootstrap servers from the AWS SDK it will give you the PLAINTEXT port and you should be good.

like image 125
Garrett Hoffman Avatar answered Nov 07 '22 08:11

Garrett Hoffman


Since it doesn't work for non-python clients either, it's unlikely that it's a bug in the library.

It seems to be a networking issue.

There is a kafka broker setting called advertised.listeners which specifies the address that the client will be using after the first connection. In other words, this is what happens when a client consumes or produces:

  1. Using the bootstrap.servers, it establish the first connection and ask for the real address to use.

  2. The broker answers back with the address specified by advertised.listeners within the brokers configuration.

  3. The client tries consuming or producing using that new address.

This is a security feature that prevents brokers that could be public accessible from being consumed/produced by clients that shouldn't have access.

How to diagnose

Run the following command:

$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L

which returns

Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
1 brokers:
  broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092

In this scenario, ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 is the address specified by the client, and even if the client have access to that address/port, ip-172-31-18-160.us-west-2.compute.internal:9092 will be the address that will be used to consume/produce.

Now, if you are running kafka in AWS MSK, it would probably be managing this for you. You have to make sure that you can access the address returned by that command. If you don't, you might need to either change it or run your command from a host that have access to it.

Another option might be to open a ssh tunnel using a bastion host that have access internally to that address.

You can find more detailed info at: https://rmoff.net/2018/08/02/kafka-listeners-explained

like image 36
JaviOverflow Avatar answered Nov 07 '22 06:11

JaviOverflow