Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to subscribe to a list of multiple kafka wildcard patterns using kafka-python?

Tags:

I'm subscribing to Kafka using a pattern with a wildcard, as shown below. The wildcard represents a dynamic customer id.

consumer.subscribe(pattern='customer.*.validations') 

This works well, because I can pluck the customer Id from the topic string. But now I need to expand on the functionality to listen to a similar topic for a slightly different purpose. Let's call it customer.*.additional-validations. The code needs to live in the same project because so much functionality is shared, but I need to be able to take a different path based on the type of queue.

In the Kafka documentation I can see that it is possible to subscribe to an array of topics. However these are hard-coded strings. Not patterns that allow for flexibility.

>>> # Deserialize msgpack-encoded values >>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) >>> consumer.subscribe(['msgpackfoo']) >>> for msg in consumer: ...     assert isinstance(msg.value, dict) 

So I'm wondering if it is possible to somehow do a combination of the two? Kind of like this (non-working):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations']) 
like image 723
Ben Harrison Avatar asked Sep 15 '16 20:09

Ben Harrison


People also ask

Is it possible to access Kafka in Python?

Accessing Kafka in Python. There are multiple Python libraries available for usage: Kafka-Python — An open-source community-based library. PyKafka — This library is maintained by Parsly and it’s claimed to be a Pythonic API. Unlike Kafka-Python you can’t create dynamic topics.

How to get a list of consumers connected to a Kafka topic?

So a consumer group will have a list of consumers. So to get the list of consumers connected to a topic we need a list of all consumer groups for that topic. So getting a list of consumers connected to a Kafka topic is a 2 step process.

What is Kafka and how does it work?

Simply put, Kafka is a distributed publish-subscribe messaging system that maintains feeds of messages in partitioned and replicated topics. In the simplest way there are three players in the Kafka ecosystem: producers, topics (run by brokers) and consumers. Producers produce messages to a topic of their choice.

How to read data from Kafka messages?

The user needs to specify the topics name directly or through a string variable to read the messages. There can be multiple topics also separated by the comma. The consumer reads data from Kafka through the polling method. The poll method returns the data fetched from the current partition's offset.


1 Answers

In the KafkaConsumer code, it supports list of topics, or a pattern,

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717

   def subscribe(self, topics=(), pattern=None, listener=None):         """Subscribe to a list of topics, or a topic regex pattern         Partitions will be dynamically assigned via a group coordinator.         Topic subscriptions are not incremental: this list will replace the         current assignment (if there is one). 

So you can create a regex, with OR condition using |, that should work as subscribe to multiple dynamic topics regex, as it internally uses re module for matching.

(customer.*.validations)|(customer.*.additional-validations)

like image 73
DhruvPathak Avatar answered Sep 16 '22 14:09

DhruvPathak