Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to do content filtering with Apache Kafka?

I have a topic named mytopic. There are one producer and 2 consumer for this topic. What I need to do is filtering the messages that produces by the producer, according to their prefix. For example, if a message starts with 'a' prefix, then only the first consumer must take it. If it starts with 'b' prefix, then only the second consumer must take it.

I made a lot search, what I found is filtering messages that came from a topic and then send them to the different topics after filtering. But as said above, I need to do filtering over one topic. How can I do that in Kafka?

like image 208
JollyRoger Avatar asked Mar 06 '23 18:03

JollyRoger


2 Answers

Allow both the consumers to consume all data, once you get the records filter them using java stream with the filter logic specific to the consumer.

In short, I mean just get the data as is and filter them using java code instead of doing it at Kafka Level.

Update:

If you want to filter at Kafka Level, you can use partitions, while sending message to kafka topic, send messages with prefix 'a' to Partition-1, and messsages with prefix 'b' to Partition-2.

Now, while consuming just consume that particular partition in respective consumers.

like image 91
Sahil Chhabra Avatar answered Mar 12 '23 23:03

Sahil Chhabra


That's easy,and dont't need to write back to different topics.
'2 consumers' means 2 consumer groups or 2 consumer threads in 1 consume group?
I will talk it both.

If it is 2 consumer threads in 1 consumer group,you can use the message 'Key' field.
Kafka send same 'Key' messages to same 'Partition'.
For example, a message prefix 'a' with key field 'a',b message prefix 'b' with key field 'b',then Kafka will send the a message to 'Partition-1',b message to 'Partition-2'. The consumer thread A can subscribe the specified 'mytopic-Partition-1',thread B can subscribe 'mytopic-Partition-2' using the 'assign' method in class 'org.apache.kafka.clients.consumer.KafkaConsumer'.

If it is 2 consumer group,just subscribe the topic and filter in the code.If unsatisfactory,use the same method above.
The tricks is sending specific prefix message to specific 'Partition'.
If you really want filter,maybe you can use Kafka Connect plugin.

like image 41
songxin Avatar answered Mar 12 '23 23:03

songxin