Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to send and consume json messages using confluent-kafka in Python

I am fairly new to Python and getting started with Kafka. So I have setup a Kafka broker and I am trying to communicate with it using confluent-kafka. I have been able to produce and consume simple messages using it, however, I have some django objects which I need to serialize and send it ti kafka.

Previously I was using kafka-python, on which I was able to send and consume json messages, however I was having some weird issues it it.

#Producer.py

def send_message(topic,message) :
try :
    try :
        p.produce(topic,message,callback=delivery_callback)
    except BufferError as b :
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %len(p))
    # Serve delivery callback queue.
    # NOTE: Since produce() is an asynchronous API this poll() call
    #       will most likely not serve the delivery callback for the
    #       last produce()d message.
    p.poll(0)
    # Wait until all messages have been delivered
    sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
    p.flush()
except Exception as e :
    import traceback
    print(traceback.format_exc())

#Consumer.py

conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
        'auto.offset.reset': 'earliest'}
c = Consumer(conf)
c.subscribe(["mykafka"])
try:
    while True:
        msg = c.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        else:
            sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                (msg.topic(), msg.partition(), msg.offset(),
                                str(msg.key())))
            print(msg.value())
except Exception as e:
    import traceback
    print(traceback.format_exc())
finally:
    c.close()

I serialize my django model objects like this :

from django.core import serializers
# assuming obj is a model instance
serialized_obj = serializers.serialize('json', [ obj, ])

So what change do I need to make in my producer and consumer to produce and cosume json messages?

like image 650
Paras Avatar asked Apr 14 '19 06:04

Paras


People also ask

How do you consume a Kafka message in python?

Create a file named consumer1.py with the following python script. KafkaConsumer module is imported from the Kafka library to read data from Kafka. sys module is used here to terminate the script. The same hostname and port number of the producer are used in the script of the consumer to read data from Kafka.

Can I use Kafka streams with python?

Client Libraries: Kafka can read, write, and process streams of events in various programming languages like Python, Java, C/C++, Scala, etc.


1 Answers

Try for the producer

send_message(topic, serialized_obj)

And the consumer, you'd be deserializing bytes to just a string

print(msg.value().decode('utf8'))

If you need json objects, then you can use json.loads

like image 72
OneCricketeer Avatar answered Oct 20 '22 04:10

OneCricketeer