I am trying kafka connect hdfs sink connector for moving json data from kafka to hdfs.
Even when the json data in kafka has schema and payload kafka connect task is failing with error
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.
Data in Kafka:
./bin/kafka-console-consumer --topic test_hdfs_json_schema_payload_1 --zookeeper localhost:2181 --from-beginning
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "deepak","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "sufi","company": "BT"}}
{"schema": {"type": "struct","fields": [{"type": "string","optional": false,"field": "Name"}, {"type": "string","optional": false,"field": "company"}],"optional": false,"name": "Person"},"payload": {"Name": "vikas","company": "BT"}}
submitted HDFS sink job using below command:
curl -X POST -H "Content-Type: application/json" --data '{"name": "connect-cluster-15may-308pm", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"2", "hdfs.url": "hdfs://localhost:9000","flush.size": "1","topics":"test_hdfs_json_schema_payload_1","topics.dir":"/deepak/kafka-connect/trial1"}}' http://localhost:8083/connectors
distributed kafka connect worker configuration:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
error message:
http://localhost:8083/connectors/connect-cluster-15may-308pm/tasks/0/status
{
"state": "FAILED",
"trace": "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
"id": 0,
"worker_id": "127.0.0.1:8083"
}
What version of Kafka Connect are you using? It helps to know this when working from the stacktrace to determine the source of the error.
I think what's happening is that you have data in the values, but not in the keys. Since you have both key.converter
and value.converter
set to the JsonConverter
and with schemas.enable=true
, it is expecting to see the envelope format containing schema
and payload
for both. However, I'm guessing your keys are all null
.
This is sort of the reverse problem as https://issues.apache.org/jira/browse/KAFKA-3832 where JsonConverter
never generates true null
values. Instead it always generates the envelope containing the expected optional schema + a null
payload. In this case, converting from Kafka to Connect's data API isn't working since it is expecting the same envelope format in the keys.
You can verify this is the problem by adding --property print.key=true
to your console consumer command. If it is printing out null
keys, the problem is that the JsonConverter cannot decode them.
An easy workaround is to just use some other Converter
for keys that doesn't care about null
values -- there's no data in the keys anyway. One that ships with Kafka Connect is org.apache.kafka.connect.storage.StringConverter
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With