How to send and consume json messages using confluent-kafka in Python

I am fairly new to Python and getting started with Kafka. So I have setup a Kafka broker and I am trying to communicate with it using confluent-kafka. I have been able to produce and consume simple messages using it, however, I have some django objects which I need to serialize and send it ti kafka.

Previously I was using kafka-python, on which I was able to send and consume json messages, however I was having some weird issues it it.


def send_message(topic,message) :
try :
    try :
    except BufferError as b :
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %len(p))
    # Serve delivery callback queue.
    # NOTE: Since produce() is an asynchronous API this poll() call
    #       will most likely not serve the delivery callback for the
    #       last produce()d message.
    # Wait until all messages have been delivered
    sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
except Exception as e :
    import traceback


conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
        'auto.offset.reset': 'earliest'}
c = Consumer(conf)
    while True:
        msg = c.poll(timeout=1.0)
        if msg is None:
        if msg.error():
            raise KafkaException(msg.error())
            sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                (msg.topic(), msg.partition(), msg.offset(),
except Exception as e:
    import traceback

I serialize my django model objects like this :

from django.core import serializers
# assuming obj is a model instance
serialized_obj = serializers.serialize('json', [ obj, ])

So what change do I need to make in my producer and consumer to produce and cosume json messages?

1 Answers

Try for the producer

send_message(topic, serialized_obj)

And the consumer, you'd be deserializing bytes to just a string


If you need json objects, then you can use json.loads

