I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.
I have been looking for something like the Spring implementation:
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
System.out.println("Received Messasge in group mygroup: " + message);
}
I have looked at:
But I couldn't find anything related to event-driven style of implementation in Python.
There is no such Kafka Stream API yet in Python, but a good alternative would be Faust. The testing in this section is executed based on 1 Zookeeper and 1 Kafka broker installed locally. This is not about performance tuning, so I’m mostly using the default configurations provided by the library.
kafka-python is designed to function much like the official java client, with a sprinkling of pythonic interfaces. It’s best used with Kafka version 0.9+. The first release was in March 2014. It’s being actively maintained. Each message is sent via send () asynchronously. When called it adds the record to a buffer and returns immediately.
The consumer will transparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between brokers. It also interacts with the assigned kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
We’re in the era of a fast-paced and ever-changing environment. Apache Kafka is a distributed streaming platform that can publish, subscribe, store and process messages in real-time. Its pull-based architecture reduces the pressure on the service with a heavy load and makes it easy to scale.
Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.
from kafka import KafkaConsumer
import threading
BOOTSTRAP_SERVERS = ['localhost:9092']
def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)
print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
kafka_listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")
def kafka_listener(data):
print("Image Ratings:\n", data.value.decode("utf-8"))
register_kafka_listener('topic1', kafka_listener)
The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.
Kafka Consumer have to continuously poll to retrieve data from brokers.
Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.
You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With