I am trying to send a very simple JSON object through Kafka and read it out the other side using Python and kafka-python. However, I keep seeing the following error:
2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback
Traceback (most recent call last):
File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs
f(value)
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response
unpacked = list(self._unpack_message_set(tp, messages))
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set
tp.topic, msg.value)
File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize
return f(bytes_)
File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda>
value_deserializer=lambda m: json.loads(m).decode('utf-8'))
File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads
return _default_decoder.decode(s)
File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "C:\Anaconda2\lib\json\decoder.py", line 382, in raw_decode
raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded
I’ve done some research and the most common cause of this error is that the JSON is wrong. I have tried printing out the JSON before I send it by adding the following to my code and the JSON prints with no errors.
while True:
json_obj1 = json.dumps({"dataObjectID": "test1"})
print json_obj1
producer.send('my-topic', {"dataObjectID": "test1"})
producer.send('my-topic', {"dataObjectID": "test2"})
time.sleep(1)
This leads me to suspect that I can produce the json, but not consume it.
Here is my code:
import threading
import logging
import time
import json
from kafka import KafkaConsumer, KafkaProducer
class Producer(threading.Thread):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
while True:
producer.send('my-topic', {"dataObjectID": "test1"})
producer.send('my-topic', {"dataObjectID": "test2"})
time.sleep(1)
class Consumer(threading.Thread):
daemon = True
def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m).decode('utf-8'))
consumer.subscribe(['my-topic'])
for message in consumer:
print (message)
def main():
threads = [
Producer(),
Consumer()
]
for t in threads:
t.start()
time.sleep(10)
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:' +
'%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()
I can successfully send and receive strings if I remove the value_serializer and value_deserializer. When I run that code I can see the JSON I am sending in. Here is a short snipit:
ConsumerRecord(topic=u'my-topic', partition=0, offset=5742, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test1"}', checksum=-1301891455, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5743, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test2"}', checksum=-1340077864, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5744, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5745, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5746, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5747, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)
So I tried removing the value_deserializer from the consumer, and that code executes but without the deserializer the message comes out as a String, which isn't what I need. So, why doesn't the value_deserializer work? Is there a different way to get the JSON from the Kafka Message that I should be using?
Create a file named consumer1.py with the following python script. KafkaConsumer module is imported from the Kafka library to read data from Kafka. sys module is used here to terminate the script. The same hostname and port number of the producer are used in the script of the consumer to read data from Kafka.
The Write JSON to a Kafka Topic Output Connector can be used to write event data, adapted and formatted as generic JSON, to an Apache Kafka Topic. For more information about getting started with Apache Kafka, see Apache Kafka Introduction.
Reading From JSON Python has a built-in package called json, which can be used to work with JSON data. It's done by using the JSON module, which provides us with a lot of methods which among loads() and load() methods are gonna help us to read the JSON file.
There are various libraries available in the Python programming language to use Kafka.
My Problem was solved after decoding the message first into utf-8, and then json.load/dump it:
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
instead of:
value_deserializer=lambda m: json.loads(m).decode('utf-8')
Hope this will also work for the Producer's side
It turns out the problem is the decode portion of value_deserializer=lambda m: json.loads(m).decode('utf-8')
when I change it to value_deserializer=lambda m: json.loads(m)
then I see the type of object being read from Kafka is now a dictionary. Which based on the following information from python's JSON documentation is correct:
|---------------------|------------------|
| JSON | Python |
|---------------------|------------------|
| object | dict |
|---------------------|------------------|
| array | list |
|---------------------|------------------|
| string | unicode |
|---------------------|------------------|
| number (int) | int, long |
|---------------------|------------------|
| number (real) | float |
|---------------------|------------------|
| true | True |
|---------------------|------------------|
| false | False |
|---------------------|------------------|
| null | None |
|---------------------|------------------|
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With