I am fairly new to Python and getting started with Kafka. So I have setup a Kafka broker and I am trying to communicate with it using confluent-kafka. I have been able to produce and consume simple messages using it, however, I have some django objects which I need to serialize and send it ti kafka.
Previously I was using kafka-python, on which I was able to send and consume json messages, however I was having some weird issues it it.
#Producer.py
def send_message(topic,message) :
try :
try :
p.produce(topic,message,callback=delivery_callback)
except BufferError as b :
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %len(p))
# Serve delivery callback queue.
# NOTE: Since produce() is an asynchronous API this poll() call
# will most likely not serve the delivery callback for the
# last produce()d message.
p.poll(0)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
except Exception as e :
import traceback
print(traceback.format_exc())
#Consumer.py
conf = {'bootstrap.servers': "localhost:9092", 'group.id': 'test', 'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest'}
c = Consumer(conf)
c.subscribe(["mykafka"])
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except Exception as e:
import traceback
print(traceback.format_exc())
finally:
c.close()
I serialize my django model objects like this :
from django.core import serializers
# assuming obj is a model instance
serialized_obj = serializers.serialize('json', [ obj, ])
So what change do I need to make in my producer and consumer to produce and cosume json messages?
Create a file named consumer1.py with the following python script. KafkaConsumer module is imported from the Kafka library to read data from Kafka. sys module is used here to terminate the script. The same hostname and port number of the producer are used in the script of the consumer to read data from Kafka.
Client Libraries: Kafka can read, write, and process streams of events in various programming languages like Python, Java, C/C++, Scala, etc.
Try for the producer
send_message(topic, serialized_obj)
And the consumer, you'd be deserializing bytes to just a string
print(msg.value().decode('utf8'))
If you need json objects, then you can use json.loads
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With