I'm building a data pipeline using Kafka. Data flow is as follows: capture data change in mongodb and have it sent to elasticsearch.
MongoDB
Kafka
Elasticsearch
Since I'm still testing, Kafka-related systems are running on single server.
start zookeepr
$ bin/zookeeper-server-start etc/kafka/zookeeper.properties
start bootstrap server
$ bin/kafka-server-start etc/kafka/server.properties
start registry schema
$ bin/schema-registry-start etc/schema-registry/schema-registry.properties
start mongodb source connetor
$ bin/connect-standalone \
etc/schema-registry/connect-avro-standalone.properties \
etc/kafka/connect-mongo-source.properties
$ cat etc/kafka/connect-mongo-source.properties
>>>
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee
$ cat etc/schema-registry/connect-avro-standalone.properties
>>>
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8083
start elasticsearch sink connector
$ bin/connect-standalone \
etc/schema-registry/connect-avro-standalone2.properties \
etc/kafka-connect-elasticsearch/elasticsearch.properties
$ cat etc/kafka-connect-elasticsearch/elasticsearch.properties
>>>
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=higee.higee.higee
key.ignore=true
connection.url=''
type.name=kafka-connect
$ cat etc/schema-registry/connect-avro-standalone2.properties
>>>
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.\
JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8084
Everything is fine with above system. Kafka connector captures data changes (CDC) and successfully sends it to elasticsearch via sink connector. The problem is that I cannot convert string-type-messaged data into structured data type. For instance, let's consume topic-data after making some changes to mongodb.
$ bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic higee.higee.higee --from-beginning | jq
Then, I get following result.
"after": null,
"patch": {
"string": "{\"_id\" : {\"$oid\" : \"5ad97f982a0f383bb638ecac\"},\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\"}"
},
"source": {
"version": {
"string": "0.7.5"
},
"name": "higee",
"rs": "172.31.50.13",
"ns": "higee",
"sec": 1524214412,
"ord": 1,
"h": {
"long": -2379508538412995600
},
"initsync": {
"boolean": false
}
},
"op": {
"string": "u"
},
"ts_ms": {
"long": 1524214412159
}
}
Then, if I go to elasticsearch, I get following result.
{
"_index": "higee.higee.higee",
"_type": "kafka-connect",
"_id": "higee.higee.higee+0+3",
"_score": 1,
"_source": {
"after": null,
"patch": """{"_id" : {"$oid" : "5ad97f982a0f383bb638ecac"},
"name" : "higee",
"salary" : 100,
"origin" : "South Korea"}""",
"source": {
"version": "0.7.5",
"name": "higee",
"rs": "172.31.50.13",
"ns": "higee",
"sec": 1524214412,
"ord": 1,
"h": -2379508538412995600,
"initsync": false
},
"op": "u",
"ts_ms": 1524214412159
}
}
One that I want to achieve is something as follows
{
"_index": "higee.higee.higee",
"_type": "kafka-connect",
"_id": "higee.higee.higee+0+3",
"_score": 1,
"_source": {
"oid" : "5ad97f982a0f383bb638ecac",
"name" : "higee",
"salary" : 100,
"origin" : "South Korea"
}"
}
Some of the options I've been trying and still considering is as follows.
logstash
case 1 : don't know how to parse those characters (/u0002, /u0001)
logstash.conf
input {
kafka {
bootstrap_servers => ["localhost:9092"]
topics => ["higee.higee.higee"]
auto_offset_reset => "earliest"
codec => json {
charset => "UTF-8"
}
}
}
filter {
json {
source => "message"
}
}
output {
stdout {
codec => rubydebug
}
}
result
{
"message" => "H\u0002�\u0001{\"_id\" : \
{\"$oid\" : \"5adafc0e2a0f383bb63910a6\"}, \
\"name\" : \"higee\", \
\"salary\" : 101, \
\"origin\" : \"South Korea\"} \
\u0002\n0.7.5\nhigee \
\u0018172.31.50.13\u001Ahigee.higee2 \
��ح\v\u0002\u0002��̗���� \u0002\u0002u\u0002�����X",
"tags" => [[0] "_jsonparsefailure"]
}
case 2
logstash.conf
input {
kafka {
bootstrap_servers => ["localhost:9092"]
topics => ["higee.higee.higee"]
auto_offset_reset => "earliest"
codec => avro {
schema_uri => "./test.avsc"
}
}
}
filter {
json {
source => "message"
}
}
output {
stdout {
codec => rubydebug
}
}
test.avsc
{
"namespace": "example",
"type": "record",
"name": "Higee",
"fields": [
{"name": "_id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "salary", "type": "int"},
{"name": "origin", "type": "string"}
]
}
result
An unexpected error occurred! {:error=>#<NoMethodError:
undefined method `type_sym' for nil:NilClass>, :backtrace=>
["/home/ec2-user/logstash-
6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:224:in `match_schemas'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:280:in `read_data'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:376:in `read_union'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:309:in `read_data'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:384:in `block in read_record'",
"org/jruby/RubyArray.java:1734:in `each'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:382:in `read_record'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:310:in `read_data'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:275:in `read'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/
logstash-codec-avro-3.2.3-java/lib/logstash/codecs/
avro.rb:77:in `decode'", "/home/ec2-user/logstash-6.1.0/
vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-
8.0.2/lib/ logstash/inputs/kafka.rb:254:in `block in
thread_runner'", "/home/ec2-user/logstash-
6.1.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-
8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in
thread_runner'"]}
python client
kafka
library : wasn't able to decode message
from kafka import KafkaConsumer
consumer = KafkaConsumer(
topics='higee.higee.higee',
auto_offset_reset='earliest'
)
for message in consumer:
message.value.decode('utf-8')
>>> 'utf-8' codec can't decode byte 0xe4 in position 6:
invalid continuation byte
confluent_kafka
wasn't compatible with python 3
Any idea how I can jsonify data in elasticsearch? Following are sources I searched.
Thanks in advance.
Some attempts
1) I've changed my connect-mongo-source.properties file as follows to test transformation.
$ cat etc/kafka/connect-mongo-source.properties
>>>
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee
transforms=unwrap
transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope
And following is error log I got. Not yet being comfortable with Kafka and more importantly debezium platform, I wasn't able to debug this error.
ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2) In this time, I've changed elasticsearch.properties and didn't make a change to connect-mongo-source.properties.
$ cat connect-mongo-source.properties
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee
$ cat elasticsearch.properties
name=elasticsearch-sink
connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=higee.higee.higee
key.ignore=true
connection.url=''
type.name=kafka-connect
transforms=unwrap
transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
And I got following error.
ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.BsonInvalidOperationException: Document does not contain key $set
at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
3) changed test.avsc and ran logstash. I didn't get any error message but the outcome wasn't something I was expecting in that origin
, salary
, name
fields were all empty even though they were given non-null values. I was even able to read data through console-consumer properly.
$ cat test.avsc
>>>
{
"type" : "record",
"name" : "MongoEvent",
"namespace" : "higee.higee",
"fields" : [ {
"name" : "_id",
"type" : {
"type" : "record",
"name" : "HigeeEvent",
"fields" : [ {
"name" : "$oid",
"type" : "string"
}, {
"name" : "salary",
"type" : "long"
}, {
"name" : "origin",
"type" : "string"
}, {
"name" : "name",
"type" : "string"
} ]
}
} ]
}
$ cat logstash3.conf
>>>
input {
kafka {
bootstrap_servers => ["localhost:9092"]
topics => ["higee.higee.higee"]
auto_offset_reset => "earliest"
codec => avro {
schema_uri => "./test.avsc"
}
}
}
output {
stdout {
codec => rubydebug
}
}
$ bin/logstash -f logstash3.conf
>>>
{
"@version" => "1",
"_id" => {
"salary" => 0,
"origin" => "",
"$oid" => "",
"name" => ""
},
"@timestamp" => 2018-04-25T09:39:07.962Z
}
You must use the Avro Consumer, otherwise you will get 'utf-8' codec can't decode byte
Even this example will not work because you still need the schema registry to lookup the schema.
The prerequisites of Confluent's Python Client says it works with Python 3.x
Nothing is stopping you from using a different client, so not sure why you left it at only trying Python.
$oid
in place of _id
Your AVSC should actually look like this
{
"type" : "record",
"name" : "MongoEvent",
"namespace" : "higee.higee",
"fields" : [ {
"name" : "_id",
"type" : {
"type" : "record",
"name" : "HigeeEvent",
"fields" : [ {
"name" : "$oid",
"type" : "string"
}, {
"name" : "salary",
"type" : "long"
}, {
"name" : "origin",
"type" : "string"
}, {
"name" : "name",
"type" : "string"
} ]
}
} ]
}
However, Avro doesn't allow for names starting with anything but a regex of [A-Za-z_]
, so that $oid
would be a problem.
While I would not recommend it (nor have actually tried it), one possible way to get your JSON-encoded Avro data into Logstash from the Avro console consumer could be use the Pipe input plugin
input {
pipe {
codec => json
command => "/path/to/confluent/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic higee.higee.higee --from-beginning"
}
}
note that the
after
value is always a string, and that by convention it will contain a JSON representation of the document
http://debezium.io/docs/connectors/mongodb/
I think this also applies to patch
values, but I don't know Debezium, really.
Kafka won't parse the JSON in-flight without the use of a Simple Message Transform (SMT). Reading the documentation you linked to, you should probably add these to your Connect Source properties
transforms=unwrap
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
Also worth pointing out, field flattening is on the roadmap - DBZ-561
Elasticsearch doesn't parse and process encoded JSON string objects without the use of something like Logstash or its JSON Processor. Rather, it only indexes them as a whole string body.
If I recall correctly, Connect will only apply an Elasticsearch mapping to top-level Avro fields, not nested ones.
In other words, the mapping that is generated follows this pattern,
"patch": {
"string": "...some JSON object string here..."
},
Where you actually need to be like this - perhaps manually defining your ES index
"patch": {
"properties": {
"_id": {
"properties" {
"$oid" : { "type": "text" },
"name" : { "type": "text" },
"salary": { "type": "int" },
"origin": { "type": "text" }
},
Again, not sure if the dollar sign is allowed, though.
If none of the above are working, you could attempt a different connector
I was able to solve this issue using python kafka client. Following is new architecture of my pipeline.
I used python 2 even though Confluent document says that python3 is supported. Main reason was that there were some python2-syntax code. For instance...(Not exactly following line but similar syntax)
except NameError, err:
In order to use with Python3 I need to convert above lines into:
except NameError as err:
That being said, following is my python code. Note that this code is only for prototyping and not for production yet.
code
from confluent_kafka.avro import AvroConsumer
c = AvroConsumer({
'bootstrap.servers': '',
'group.id': 'groupid',
'schema.registry.url': ''
})
c.subscribe(['higee.higee.higee'])
x = True
while x:
msg = c.poll(100)
if msg:
message = msg.value()
print(message)
x = False
c.close()
(after updating a document in mongodb) let's check message
variable
{u'after': None,
u'op': u'u',
u'patch': u'{
"_id" : {"$oid" : "5adafc0e2a0f383bb63910a6"},
"name" : "higee",
"salary" : 100,
"origin" : "S Korea"}',
u'source': {
u'h': 5734791721791032689L,
u'initsync': False,
u'name': u'higee',
u'ns': u'higee.higee',
u'ord': 1,
u'rs': u'',
u'sec': 1524362971,
u'version': u'0.7.5'},
u'ts_ms': 1524362971148
}
code
patch = message['patch']
patch_dict = eval(patch)
patch_dict.pop('_id')
check patch_dict
{'name': 'higee', 'origin': 'S Korea', 'salary': 100}
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"namespace": "higee.higee",
"name": "MongoEvent",
"type": "record",
"fields" : [
{
"name" : "name",
"type" : "string"
},
{
"name" : "origin",
"type" : "string"
},
{
"name" : "salary",
"type" : "int32"
}
]
}
"""
AvroProducerConf = {
'bootstrap.servers': '',
'schema.registry.url': ''
}
value_schema = avro.load('./user.avsc')
avroProducer = AvroProducer(
AvroProducerConf,
default_value_schema=value_schema
)
avroProducer.produce(topic='python', value=patch_dict)
avroProducer.flush()
The only thing left is to make elasticsearch sink connector respond to new topic 'python' by setting configuration in following format. Everything remains the same except topics
.
name=elasticsearch-sink
connector.class= io.confluent.connect \
elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=python
key.ignore=true
connection.url=''
type.name=kafka-connect
Then run the elasticsearch sink connector and have it checked at elasticsearch.
{
"_index": "zzzz",
"_type": "kafka-connect",
"_id": "zzzz+0+3",
"_score": 1,
"_source": {
"name": "higee",
"origin": "S Korea",
"salary": 100
}
}
+1 to @cricket_007's suggestion - use the io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
single message transformation. You can read more about SMTs and their benefit's here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With