I'm trying to build a Flask app that has Kafka as an interface. I used a Python connector, kafka-python and a Docker image for Kafka, spotify/kafkaproxy .
Below is the docker-compose file.
version: '3.3'
services:
kafka:
image: spotify/kafkaproxy
container_name: kafka_dev
ports:
- '9092:9092'
- '2181:2181'
environment:
- ADVERTISED_HOST=0.0.0.0
- ADVERTISED_PORT=9092
- CONSUMER_THREADS=1
- TOPICS=PROFILE_CREATED,IMG_RATED
- ZK_CONNECT=kafka7zookeeper:2181/root/path
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app
depends_on:
- kafka
Below is the Python snippet I used to connect to kafka. Here, I used the Kafka container's alias kafka
to connect, as Docker would take care of mapping the alias to it's IP address.
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)
I got NoBrokersAvailable
error. From this, I could understand that the Flask app could not find the Kafka server.
Traceback (most recent call last):
File "./app.py", line 11, in <module>
consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
Other Observations:
ping kafka
from the Flask container and get packets from the Kafka container. BOOTSTRAP_SERVERS = ['localhost:9092']
, it works fine.UPDATE
As mentioned by cricket_007, given that you are using the docker-compose provided below, you should use kafka:29092
to connect to Kafka from another container. So your code would look like this:
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)
END UPDATE
I would recommend you use the Kafka images from Confluent Inc, they have all sorts of example setups using docker-compose that are ready to use and they are always updating them.
Try this out:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app
I used this docker-compose.yml and added your service on top Please note that:
The config used here exposes port 9092 for external connections to the broker i.e. those from outside the docker network. This could be from the host machine running docker, or maybe further afield if you've got a more complicated setup. If the latter is true, you will need to change the value 'localhost' in KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those remote clients
Make sure you check out the other examples, may be useful for you especially when moving to production environments: https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples
Also worth checking:
It seems that you need to specify the api_version to avoid this error. For more details check here.
Version 1.3.5 of this library (which is latest on pypy) only lists certain API versions 0.8.0 to 0.10.1. So unless you explicitly specify api_version to be (0, 10, 1) the client library's attempt to discover the version will cause a NoBrokersAvailable error.
producer = KafkaProducer(
bootstrap_servers=URL,
client_id=CLIENT_ID,
value_serializer=JsonSerializer.serialize,
api_version=(0, 10, 1)
)
This should work, interestingly enough setting the api_version is accidentally fixing the issue according to this:
When you set api_version the client will not attempt to probe brokers for version information. So it is the probe operation that is failing. One large difference between the version probe connections and the general connections is that the former only attempts to connect on a single interface per connection (per broker), where as the latter -- general operation -- will cycle through all interfaces continually until a connection succeeds. #1411 fixes this by switching the version probe logic to attempt a connection on all found interfaces.
The actual issue is described here
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With