Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka connect hdfs sink connector is failing even when json data contains schema and payload field

I am trying kafka connect hdfs sink connector for moving json data from kafka to hdfs.

Even when the json data in kafka has schema and payload kafka connect task is failing with error

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.

Data in Kafka:

./bin/kafka-console-consumer --topic test_hdfs_json_schema_payload_1 --zookeeper localhost:2181 --from-beginning

{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "deepak","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "sufi","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "vikas","company": "BT"}}

submitted HDFS sink job using below command:

curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-cluster-15may-308pm", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"2", "hdfs.url": "hdfs://localhost:9000","flush.size": "1","topics":"test_hdfs_json_schema_payload_1","topics.dir":"/deepak/kafka-connect/trial1"}}' http://localhost:8083/connectors

distributed kafka connect worker configuration:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

error message:

http://localhost:8083/connectors/connect-cluster-15may-308pm/tasks/0/status

{
    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
    "id": 0,
    "worker_id": "127.0.0.1:8083"
}
like image 334
Deepak Kumar Avatar asked May 15 '17 10:05

Deepak Kumar


1 Answers

What version of Kafka Connect are you using? It helps to know this when working from the stacktrace to determine the source of the error.

I think what's happening is that you have data in the values, but not in the keys. Since you have both key.converter and value.converter set to the JsonConverter and with schemas.enable=true, it is expecting to see the envelope format containing schema and payload for both. However, I'm guessing your keys are all null.

This is sort of the reverse problem as https://issues.apache.org/jira/browse/KAFKA-3832 where JsonConverter never generates true null values. Instead it always generates the envelope containing the expected optional schema + a null payload. In this case, converting from Kafka to Connect's data API isn't working since it is expecting the same envelope format in the keys.

You can verify this is the problem by adding --property print.key=true to your console consumer command. If it is printing out null keys, the problem is that the JsonConverter cannot decode them.

An easy workaround is to just use some other Converter for keys that doesn't care about null values -- there's no data in the keys anyway. One that ships with Kafka Connect is org.apache.kafka.connect.storage.StringConverter.

like image 125
Ewen Cheslack-Postava Avatar answered Oct 14 '22 10:10

Ewen Cheslack-Postava