Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unable to convert Kafka topic data into structured JSON with Confluent Elasticsearch sink connector

I'm building a data pipeline using Kafka. Data flow is as follows: capture data change in mongodb and have it sent to elasticsearch.

enter image description here

MongoDB

  • version 3.6
  • shard cluster

Kafka

  • Confuent Platform 4.1.0
  • mongoDB source connector : debezium 0.7.5
  • elasticserach sink connector

Elasticsearch

  • version 6.1.0

Since I'm still testing, Kafka-related systems are running on single server.

  • start zookeepr

    $ bin/zookeeper-server-start etc/kafka/zookeeper.properties
    
  • start bootstrap server

    $ bin/kafka-server-start etc/kafka/server.properties
    
  • start registry schema

    $ bin/schema-registry-start etc/schema-registry/schema-registry.properties
    
  • start mongodb source connetor

    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone.properties \ 
      etc/kafka/connect-mongo-source.properties
    
    $ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    
    $ cat etc/schema-registry/connect-avro-standalone.properties
    >>>
    bootstrap.servers=localhost:9092
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.port=8083
    
  • start elasticsearch sink connector

    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone2.properties  \ 
      etc/kafka-connect-elasticsearch/elasticsearch.properties
    
    $ cat etc/kafka-connect-elasticsearch/elasticsearch.properties
    >>>
    name=elasticsearch-sink
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    
    $ cat etc/schema-registry/connect-avro-standalone2.properties
    >>>
    bootstrap.servers=localhost:9092
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.\ 
                          JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.port=8084
    

Everything is fine with above system. Kafka connector captures data changes (CDC) and successfully sends it to elasticsearch via sink connector. The problem is that I cannot convert string-type-messaged data into structured data type. For instance, let's consume topic-data after making some changes to mongodb.

    $ bin/kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic higee.higee.higee --from-beginning | jq

Then, I get following result.

    "after": null,
      "patch": {
        "string": "{\"_id\" : {\"$oid\" : \"5ad97f982a0f383bb638ecac\"},\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\"}"
      },
      "source": {
        "version": {
          "string": "0.7.5"
        },
        "name": "higee",
        "rs": "172.31.50.13",
        "ns": "higee",
        "sec": 1524214412,
        "ord": 1,
        "h": {
          "long": -2379508538412995600
        },
        "initsync": {
          "boolean": false
        }
      },
      "op": {
        "string": "u"
      },
      "ts_ms": {
        "long": 1524214412159
      }
    }

Then, if I go to elasticsearch, I get following result.

    {
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "after": null,
          "patch": """{"_id" : {"$oid" : "5ad97f982a0f383bb638ecac"}, 
                       "name" : "higee", 
                       "salary" : 100,
                       "origin" : "South Korea"}""",
          "source": {
            "version": "0.7.5",
            "name": "higee",
            "rs": "172.31.50.13",
            "ns": "higee",
            "sec": 1524214412,
            "ord": 1,
            "h": -2379508538412995600,
            "initsync": false
          },
          "op": "u",
          "ts_ms": 1524214412159
        }
      }

One that I want to achieve is something as follows

    {
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "oid" : "5ad97f982a0f383bb638ecac",
          "name" : "higee", 
          "salary" : 100,
          "origin" : "South Korea"
         }"
     }

Some of the options I've been trying and still considering is as follows.

  • logstash

    • case 1 : don't know how to parse those characters (/u0002, /u0001)

      • logstash.conf

        input {
          kafka {
            bootstrap_servers => ["localhost:9092"]
            topics => ["higee.higee.higee"]
            auto_offset_reset => "earliest"
            codec => json {
              charset => "UTF-8"
            }
          }
        }
        
        filter {
          json {
            source => "message"
          }
         }
        
        output {
          stdout {
            codec => rubydebug
          }
        }
        
      • result

        {
        "message" => "H\u0002�\u0001{\"_id\" : \
            {\"$oid\" : \"5adafc0e2a0f383bb63910a6\"}, \
             \"name\" : \"higee\", \
             \"salary\" : 101, \
             \"origin\" : \"South Korea\"} \
             \u0002\n0.7.5\nhigee \ 
             \u0018172.31.50.13\u001Ahigee.higee2 \ 
             ��ح\v\u0002\u0002��̗���� \u0002\u0002u\u0002�����X",
        "tags" => [[0] "_jsonparsefailure"]
        }
        
    • case 2

      • logstash.conf

        input {
          kafka {
            bootstrap_servers => ["localhost:9092"]
            topics => ["higee.higee.higee"]
            auto_offset_reset => "earliest"
            codec => avro {
              schema_uri => "./test.avsc"
            }
          }
        }
        
        filter {
          json {
            source => "message"
          }
        }
        
        output {
          stdout {
            codec => rubydebug
          }
        }
        
      • test.avsc

        {
            "namespace": "example",
            "type": "record",
            "name": "Higee",
            "fields": [
              {"name": "_id", "type": "string"},
              {"name": "name", "type": "string"},
              {"name": "salary",  "type": "int"},
              {"name": "origin", "type": "string"}
            ]
         }
        
      • result

        An unexpected error occurred! {:error=>#<NoMethodError: 
        undefined method `type_sym' for nil:NilClass>, :backtrace=> 
        ["/home/ec2-user/logstash- 
        6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:224:in `match_schemas'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:280:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:376:in `read_union'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:309:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:384:in `block in read_record'", 
        "org/jruby/RubyArray.java:1734:in `each'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:382:in `read_record'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:310:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:275:in `read'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/ 
        logstash-codec-avro-3.2.3-java/lib/logstash/codecs/ 
        avro.rb:77:in `decode'", "/home/ec2-user/logstash-6.1.0/ 
        vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
        8.0.2/lib/ logstash/inputs/kafka.rb:254:in `block in 
        thread_runner'", "/home/ec2-user/logstash- 
        6.1.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
        8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in 
        thread_runner'"]}
        
  • python client

    • consumes topic and produce with different topic name after some data manipulation so that elasticsearch sink connector could just consume well-formatted message from python-manipulated topic
    • kafka library : wasn't able to decode message

      from kafka import KafkaConsumer
      
      consumer = KafkaConsumer(
                   topics='higee.higee.higee',
                   auto_offset_reset='earliest'
                 )
      
      for message in consumer:
          message.value.decode('utf-8')
      
      >>> 'utf-8' codec can't decode byte 0xe4 in position 6: 
          invalid continuation byte
      
    • confluent_kafka wasn't compatible with python 3


Any idea how I can jsonify data in elasticsearch? Following are sources I searched.

  • mongodb debezium
  • mongodb event flattening
  • avro converter
  • serializing debizium events
  • debizum tutorial

Thanks in advance.


Some attempts

1) I've changed my connect-mongo-source.properties file as follows to test transformation.

    $ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    transforms=unwrap     
    transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope

And following is error log I got. Not yet being comfortable with Kafka and more importantly debezium platform, I wasn't able to debug this error.

ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
    at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
    at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
    at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
    at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
    at org.bson.BsonDocument.parse(BsonDocument.java:62)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2) In this time, I've changed elasticsearch.properties and didn't make a change to connect-mongo-source.properties.

$ cat connect-mongo-source.properties

    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee

$ cat elasticsearch.properties

    name=elasticsearch-sink
    connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    transforms=unwrap
    transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

And I got following error.

ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.BsonInvalidOperationException: Document does not contain key $set
    at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
    at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

3) changed test.avsc and ran logstash. I didn't get any error message but the outcome wasn't something I was expecting in that origin, salary, name fields were all empty even though they were given non-null values. I was even able to read data through console-consumer properly.

$ cat test.avsc
>>>
    {
      "type" : "record",
      "name" : "MongoEvent",
      "namespace" : "higee.higee",
      "fields" : [ {
        "name" : "_id",
        "type" : {
          "type" : "record",
          "name" : "HigeeEvent",
          "fields" : [ {
            "name" : "$oid",
            "type" : "string"
          }, {
            "name" : "salary",
            "type" : "long"
          }, {
            "name" : "origin",
            "type" : "string"
          }, {
            "name" : "name",
            "type" : "string"
          } ]
        }
      } ]
    }

$ cat logstash3.conf
>>>
    input {
      kafka {
        bootstrap_servers => ["localhost:9092"]
        topics => ["higee.higee.higee"]
        auto_offset_reset => "earliest"
        codec => avro {
          schema_uri => "./test.avsc"
        }
      }
    }

    output {
      stdout {
       codec => rubydebug
      }
    }

$ bin/logstash -f logstash3.conf
>>>
    {
    "@version" => "1",
    "_id" => {
      "salary" => 0,
      "origin" => "",
      "$oid" => "",
      "name" => ""
    },
    "@timestamp" => 2018-04-25T09:39:07.962Z
    }
like image 877
Gee Yeol Nahm Avatar asked Apr 21 '18 10:04

Gee Yeol Nahm


3 Answers

Python Client

You must use the Avro Consumer, otherwise you will get 'utf-8' codec can't decode byte

Even this example will not work because you still need the schema registry to lookup the schema.

The prerequisites of Confluent's Python Client says it works with Python 3.x

Nothing is stopping you from using a different client, so not sure why you left it at only trying Python.

Logstash Avro Codec

  1. JSON Codec cannot decode Avro data. I don't think the json filter following the avro input codec will work either
  2. Your Avro schema is wrong - You're missing the $oid in place of _id
  3. There is a difference between "raw Avro" (that includes the schema within the message itself), and Confluent's encoded version of it (which only contains the schema ID in the registry). Meaning, Logstash doesn't integrate with the Schema Registry... at least not without a plugin.

Your AVSC should actually look like this

{
  "type" : "record",
  "name" : "MongoEvent",
  "namespace" : "higee.higee",
  "fields" : [ {
    "name" : "_id",
    "type" : {
      "type" : "record",
      "name" : "HigeeEvent",
      "fields" : [ {
        "name" : "$oid",
        "type" : "string"
      }, {
        "name" : "salary",
        "type" : "long"
      }, {
        "name" : "origin",
        "type" : "string"
      }, {
        "name" : "name",
        "type" : "string"
      } ]
    }
  } ]
}

However, Avro doesn't allow for names starting with anything but a regex of [A-Za-z_], so that $oid would be a problem.

While I would not recommend it (nor have actually tried it), one possible way to get your JSON-encoded Avro data into Logstash from the Avro console consumer could be use the Pipe input plugin

input {
  pipe {
    codec => json
    command => "/path/to/confluent/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic higee.higee.higee --from-beginning" 
  }
}

Debezium

note that the after value is always a string, and that by convention it will contain a JSON representation of the document

http://debezium.io/docs/connectors/mongodb/

I think this also applies to patch values, but I don't know Debezium, really.

Kafka won't parse the JSON in-flight without the use of a Simple Message Transform (SMT). Reading the documentation you linked to, you should probably add these to your Connect Source properties

transforms=unwrap
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

Also worth pointing out, field flattening is on the roadmap - DBZ-561

Kafka Connect Elasticsearch

Elasticsearch doesn't parse and process encoded JSON string objects without the use of something like Logstash or its JSON Processor. Rather, it only indexes them as a whole string body.

If I recall correctly, Connect will only apply an Elasticsearch mapping to top-level Avro fields, not nested ones.

In other words, the mapping that is generated follows this pattern,

"patch": {
    "string": "...some JSON object string here..."
  },

Where you actually need to be like this - perhaps manually defining your ES index

"patch": {
   "properties": {
      "_id": {
        "properties" {
          "$oid" :  { "type": "text" }, 
          "name" :  { "type": "text" },
          "salary":  { "type": "int"  }, 
          "origin": { "type": "text" }
      },

Again, not sure if the dollar sign is allowed, though.

Kafka Connect MongoDB Source

If none of the above are working, you could attempt a different connector

  • Option 1
  • Option 2
like image 110
OneCricketeer Avatar answered Nov 19 '22 23:11

OneCricketeer


I was able to solve this issue using python kafka client. Following is new architecture of my pipeline.

enter image description here

I used python 2 even though Confluent document says that python3 is supported. Main reason was that there were some python2-syntax code. For instance...(Not exactly following line but similar syntax)

    except NameError, err:

In order to use with Python3 I need to convert above lines into:

    except NameError as err:

That being said, following is my python code. Note that this code is only for prototyping and not for production yet.

Consume a message via Confluent Consumer

  • code

    from confluent_kafka.avro import AvroConsumer
    
    c = AvroConsumer({ 
           'bootstrap.servers': '',
           'group.id': 'groupid',
           'schema.registry.url': ''
        })
    
    c.subscribe(['higee.higee.higee'])
    
    x = True
    
    while x:
        msg = c.poll(100)
        if msg:
            message = msg.value()
            print(message)
            x = False
    
    c.close()
    
  • (after updating a document in mongodb) let's check message variable

    {u'after': None,
     u'op': u'u',
     u'patch': u'{
         "_id" : {"$oid" : "5adafc0e2a0f383bb63910a6"},
         "name" : "higee",
         "salary" : 100,
         "origin" : "S Korea"}',
     u'source': {
         u'h': 5734791721791032689L,
         u'initsync': False,
         u'name': u'higee',
         u'ns': u'higee.higee',
         u'ord': 1,
         u'rs': u'',
         u'sec': 1524362971,
         u'version': u'0.7.5'},
     u'ts_ms': 1524362971148
     }
    

manipulate message consumed

  • code

    patch = message['patch']
    patch_dict = eval(patch)
    patch_dict.pop('_id')
    
  • check patch_dict

    {'name': 'higee', 'origin': 'S Korea', 'salary': 100}
    

Produce a message via Confluent Producer

    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer

    value_schema_str = """
    {
       "namespace": "higee.higee",
       "name": "MongoEvent",
       "type": "record",
       "fields" : [
           {
               "name" : "name",
               "type" : "string"
           },
           {
              "name" : "origin",
              "type" : "string"
           },
           {
               "name" : "salary",
               "type" : "int32"
           }
       ]
    }
    """
    AvroProducerConf = {
        'bootstrap.servers': '',
        'schema.registry.url': ''
    }

    value_schema = avro.load('./user.avsc')
    avroProducer = AvroProducer(
                       AvroProducerConf, 
                       default_value_schema=value_schema
                   )

    avroProducer.produce(topic='python', value=patch_dict)
    avroProducer.flush()

The only thing left is to make elasticsearch sink connector respond to new topic 'python' by setting configuration in following format. Everything remains the same except topics.

    name=elasticsearch-sink
    connector.class= io.confluent.connect \ 
                     elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=python
    key.ignore=true
    connection.url=''
    type.name=kafka-connect

Then run the elasticsearch sink connector and have it checked at elasticsearch.

    {
        "_index": "zzzz",
        "_type": "kafka-connect",
        "_id": "zzzz+0+3",
        "_score": 1,
        "_source": {
          "name": "higee",
          "origin": "S Korea",
          "salary": 100
        }
      }
like image 45
Gee Yeol Nahm Avatar answered Nov 19 '22 21:11

Gee Yeol Nahm


+1 to @cricket_007's suggestion - use the io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope single message transformation. You can read more about SMTs and their benefit's here.

like image 1
Robin Moffatt Avatar answered Nov 19 '22 21:11

Robin Moffatt