Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ElasticsearchSinkConnector Failed to deserialize data to Avro

I created the simplest kafka sink connector config and I'm using confluent 4.1.0:

{
  "connector.class": 
  "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "type.name": "test-type",
  "tasks.max": "1",
  "topics": "dialogs",
  "name": "elasticsearch-sink",
  "key.ignore": "true",
  "connection.url": "http://localhost:9200",
  "schema.ignore": "true"
}

and in the topic I save the messages in JSON

{ "topics": "resd"}

But in the result I get an error:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

like image 335
Mikhail Avatar asked Sep 02 '25 17:09

Mikhail


2 Answers

As cricket_007 says, you need to tell Connect to use Json deserialiser, if that's the format your data is in. Add this to your connector configuration:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"
like image 68
Robin Moffatt Avatar answered Sep 04 '25 05:09

Robin Moffatt


That error happens because it's trying to read non Confluent Schema Registry encoded Avro messages.

If the topic data is Avro, it needs to use the Schema Registry.

Otherwise, if topic data is JSON, then you've started the connect cluster with AvroConverter on your keys or values in the property file, where you need to use the JsonConverter instead

like image 27
OneCricketeer Avatar answered Sep 04 '25 07:09

OneCricketeer