Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a Python API for event-driven Kafka consumer?

I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.

I have been looking for something like the Spring implementation:

@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
    System.out.println("Received Messasge in group mygroup: " + message);
}

I have looked at:

  1. kafka-python
  2. pykafka
  3. confluent-kafka

But I couldn't find anything related to event-driven style of implementation in Python.

like image 338
Shashank Avatar asked Nov 12 '18 13:11

Shashank


People also ask

Is there a Kafka stream API in Python?

There is no such Kafka Stream API yet in Python, but a good alternative would be Faust. The testing in this section is executed based on 1 Zookeeper and 1 Kafka broker installed locally. This is not about performance tuning, so I’m mostly using the default configurations provided by the library.

What is Kafka-Python and how does it work?

kafka-python is designed to function much like the official java client, with a sprinkling of pythonic interfaces. It’s best used with Kafka version 0.9+. The first release was in March 2014. It’s being actively maintained. Each message is sent via send () asynchronously. When called it adds the record to a buffer and returns immediately.

What is a Kafka consumer?

The consumer will transparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between brokers. It also interacts with the assigned kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

What is Apache Kafka and how does it work?

We’re in the era of a fast-paced and ever-changing environment. Apache Kafka is a distributed streaming platform that can publish, subscribe, store and process messages in real-time. Its pull-based architecture reduces the pressure on the service with a heavy load and makes it easy to scale.


2 Answers

Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.

from kafka import KafkaConsumer
import threading

BOOTSTRAP_SERVERS = ['localhost:9092']

def register_kafka_listener(topic, listener):
# Poll kafka
    def poll():
        # Initialize consumer Instance
        consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

        print("About to start polling for topic:", topic)
        consumer.poll(timeout_ms=6000)
        print("Started Polling for topic:", topic)
        for msg in consumer:
            print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
            kafka_listener(msg)
    print("About to register listener to topic:", topic)
    t1 = threading.Thread(target=poll)
    t1.start()
    print("started a background thread")

def kafka_listener(data):
    print("Image Ratings:\n", data.value.decode("utf-8"))

register_kafka_listener('topic1', kafka_listener)

The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.

like image 104
Shashank Avatar answered Oct 08 '22 08:10

Shashank


Kafka Consumer have to continuously poll to retrieve data from brokers.

Spring gives you this fancy API but under the covers, it calls poll in a loop and only invokes your method when records have been retrieved.

You can easily build something similar with any of the Python clients you've mentioned. Like in Java, this is not an API directly exposed by (most) Kafka clients but instead something provided by a layer on top. It's something you need to build.

like image 22
Mickael Maison Avatar answered Oct 08 '22 09:10

Mickael Maison