Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cannot connect to Kafka from Flask in a dockerized environement

I'm trying to build a Flask app that has Kafka as an interface. I used a Python connector, kafka-python and a Docker image for Kafka, spotify/kafkaproxy .

Below is the docker-compose file.

version: '3.3'
services:
  kafka:
    image: spotify/kafkaproxy
    container_name: kafka_dev
    ports:
      - '9092:9092'
      - '2181:2181'
    environment:
      - ADVERTISED_HOST=0.0.0.0
      - ADVERTISED_PORT=9092
      - CONSUMER_THREADS=1
      - TOPICS=PROFILE_CREATED,IMG_RATED
      - ZK_CONNECT=kafka7zookeeper:2181/root/path
  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - '9000:5000'
    volumes:
      - ./flask-app:/app
    depends_on:
      - kafka

Below is the Python snippet I used to connect to kafka. Here, I used the Kafka container's alias kafka to connect, as Docker would take care of mapping the alias to it's IP address.

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

I got NoBrokersAvailable error. From this, I could understand that the Flask app could not find the Kafka server.

Traceback (most recent call last):
  File "./app.py", line 11, in <module>
    consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

Other Observations:

  1. I was able to run ping kafka from the Flask container and get packets from the Kafka container.
  2. When I run the Flask app locally, trying to connect to the Kafka container by setting BOOTSTRAP_SERVERS = ['localhost:9092'], it works fine.
like image 550
Shashank Avatar asked Nov 07 '22 23:11

Shashank


1 Answers

UPDATE

As mentioned by cricket_007, given that you are using the docker-compose provided below, you should use kafka:29092 to connect to Kafka from another container. So your code would look like this:

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

END UPDATE

I would recommend you use the Kafka images from Confluent Inc, they have all sorts of example setups using docker-compose that are ready to use and they are always updating them.

Try this out:

---
version: '2'
services:
zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
    - zookeeper
    ports:
    - 9092:9092
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
    - '9000:5000'
    volumes:
    - ./flask-app:/app

I used this docker-compose.yml and added your service on top Please note that:

The config used here exposes port 9092 for external connections to the broker i.e. those from outside the docker network. This could be from the host machine running docker, or maybe further afield if you've got a more complicated setup. If the latter is true, you will need to change the value 'localhost' in KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those remote clients

Make sure you check out the other examples, may be useful for you especially when moving to production environments: https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples

Also worth checking:

It seems that you need to specify the api_version to avoid this error. For more details check here.

Version 1.3.5 of this library (which is latest on pypy) only lists certain API versions 0.8.0 to 0.10.1. So unless you explicitly specify api_version to be (0, 10, 1) the client library's attempt to discover the version will cause a NoBrokersAvailable error.

producer = KafkaProducer(
    bootstrap_servers=URL,
    client_id=CLIENT_ID,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)

This should work, interestingly enough setting the api_version is accidentally fixing the issue according to this:

When you set api_version the client will not attempt to probe brokers for version information. So it is the probe operation that is failing. One large difference between the version probe connections and the general connections is that the former only attempts to connect on a single interface per connection (per broker), where as the latter -- general operation -- will cycle through all interfaces continually until a connection succeeds. #1411 fixes this by switching the version probe logic to attempt a connection on all found interfaces.

The actual issue is described here

like image 170
lloiacono Avatar answered Nov 13 '22 01:11

lloiacono