Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can't Consume JSON Messages From Kafka Using Kafka-Python's Deserializer

I am trying to send a very simple JSON object through Kafka and read it out the other side using Python and kafka-python. However, I keep seeing the following error:

2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback
Traceback (most recent call last):
  File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs
    f(value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response
    unpacked = list(self._unpack_message_set(tp, messages))
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set
    tp.topic, msg.value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize
    return f(bytes_)
  File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda>
    value_deserializer=lambda m: json.loads(m).decode('utf-8'))
  File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads
    return _default_decoder.decode(s)
  File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "C:\Anaconda2\lib\json\decoder.py", line 382, in raw_decode
    raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded

I’ve done some research and the most common cause of this error is that the JSON is wrong. I have tried printing out the JSON before I send it by adding the following to my code and the JSON prints with no errors.

  while True:
        json_obj1 = json.dumps({"dataObjectID": "test1"})
        print json_obj1
        producer.send('my-topic', {"dataObjectID": "test1"})
        producer.send('my-topic', {"dataObjectID": "test2"})
        time.sleep(1)

This leads me to suspect that I can produce the json, but not consume it.

Here is my code:

import threading
import logging
import time
import json

from kafka import KafkaConsumer, KafkaProducer


class Producer(threading.Thread):
    daemon = True

    def run(self):
        producer = KafkaProducer(bootstrap_servers='localhost:9092',
                                 value_serializer=lambda v: json.dumps(v).encode('utf-8'))

        while True:
            producer.send('my-topic', {"dataObjectID": "test1"})
            producer.send('my-topic', {"dataObjectID": "test2"})
            time.sleep(1)


class Consumer(threading.Thread):
    daemon = True

    def run(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 value_deserializer=lambda m: json.loads(m).decode('utf-8'))
        consumer.subscribe(['my-topic'])

        for message in consumer:
            print (message)


def main():
    threads = [
        Producer(),
        Consumer()
    ]

    for t in threads:
        t.start()

    time.sleep(10)

if __name__ == "__main__":
    logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:' +
               '%(levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )
    main()

I can successfully send and receive strings if I remove the value_serializer and value_deserializer. When I run that code I can see the JSON I am sending in. Here is a short snipit:

ConsumerRecord(topic=u'my-topic', partition=0, offset=5742, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test1"}', checksum=-1301891455, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5743, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test2"}', checksum=-1340077864, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5744, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5745, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5746, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5747, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)

So I tried removing the value_deserializer from the consumer, and that code executes but without the deserializer the message comes out as a String, which isn't what I need. So, why doesn't the value_deserializer work? Is there a different way to get the JSON from the Kafka Message that I should be using?

like image 645
jencoston Avatar asked Apr 07 '17 16:04

jencoston


People also ask

How do you consume a message from Kafka topic using python?

Create a file named consumer1.py with the following python script. KafkaConsumer module is imported from the Kafka library to read data from Kafka. sys module is used here to terminate the script. The same hostname and port number of the producer are used in the script of the consumer to read data from Kafka.

Can we send JSON in Kafka?

The Write JSON to a Kafka Topic Output Connector can be used to write event data, adapted and formatted as generic JSON, to an Apache Kafka Topic. For more information about getting started with Apache Kafka, see Apache Kafka Introduction.

How do I read a JSON file in python?

Reading From JSON Python has a built-in package called json, which can be used to work with JSON data. It's done by using the JSON module, which provides us with a lot of methods which among loads() and load() methods are gonna help us to read the JSON file.

Can I use Kafka with python?

There are various libraries available in the Python programming language to use Kafka.


2 Answers

My Problem was solved after decoding the message first into utf-8, and then json.load/dump it:

value_deserializer=lambda m: json.loads(m.decode('utf-8'))

instead of:

value_deserializer=lambda m: json.loads(m).decode('utf-8')

Hope this will also work for the Producer's side

like image 109
Christoph Schranz Avatar answered Oct 19 '22 00:10

Christoph Schranz


It turns out the problem is the decode portion of value_deserializer=lambda m: json.loads(m).decode('utf-8') when I change it to value_deserializer=lambda m: json.loads(m) then I see the type of object being read from Kafka is now a dictionary. Which based on the following information from python's JSON documentation is correct:

|---------------------|------------------|
|       JSON          |     Python       |
|---------------------|------------------|
|      object         |      dict        |
|---------------------|------------------|
|      array          |      list        |
|---------------------|------------------|
|      string         |      unicode     |
|---------------------|------------------|
|      number (int)   |      int, long   |
|---------------------|------------------|
|      number (real)  |      float       |
|---------------------|------------------|
|      true           |      True        |
|---------------------|------------------|
|      false          |      False       |
|---------------------|------------------|
|      null           |      None        |
|---------------------|------------------|
like image 31
jencoston Avatar answered Oct 19 '22 00:10

jencoston