I'm new at Apache Kafka technology. I'm trying to send messages as JSON object to a kafka topic by using python 2.7 but i got "AssertionError: Value must be bytes" error. I can send messages as string succesfully, i can see my messages with kafka-console-consumer.sh. I'm using apache kafka 2.10-0.8.2.1 version. I'm giving my code below.
from kafka import KafkaProducer
import yaml
producer = KafkaProducer(bootstap_servers="localhost:9092")
msg = yaml.safe_load('{"id":1, "name":"oguz"}')
producer.send("my-topic", msg)
Thanks for your help.
yaml.safe_load()
returns a dict, so there are two things required to turn it into bytes -- serialize it to a string via JSON, and then encode that to bytes as UTF-8.
Taken from the examples in the kafka-python docs, you can use the value_serializer
keyword argument when you instantiate the KafkaProducer
:
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('my-topic', msg)
Alternatively you could just serialize it manually when you call send()
:
>>> producer.send('my-topic', json.dumps(msg).encode('utf-8'))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With