Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Produce Avro messages in Confluent Control Center UI

To develop a data transfer application I need first define a key/value avro schemas. The producer application is not developed yet till define the avro schema.

I cloned a topic and its key/value avro schemas that are already working and and also cloned the the jdbc snink connector. Simply I just changed the topic and connector names.

Then I copied and existing message successfully sent sink using Confluent Topic Message UI Producer. enter image description here

But it is sending the error: "Unknown magic byte!"

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:323)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:164)
        at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
        ... 17 more
[2022-07-25 03:45:42,385] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)

Reading other questions it seems the message has to be serialized using the schema.

Unknown magic byte with kafka-avro-console-consumer

is it possible to send a message to a topic with AVRO key/value schemas using the Confluent Topic UI?

Any idea if the avro schemas need information depending on the connector/source? or if namespace depends on the topic name?

This is my key schema. And the topic's name is knov_03

{
  "connect.name": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming.Key",
  "fields": [
    {
      "name": "id_sap_incoming",
      "type": "long"
    }
  ],
  "name": "Key",
  "namespace": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming",
  "type": "record"
}

enter image description here

Connector:

{
  "name": "knov_05",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "knov_03",
    "connection.url": "jdbc:mysql://eXXXXX:3306/MY_DB_SCHEMA?useSSL=FALSE&nullCatalogMeansCurrent=true",
    "connection.user": "USER",
    "connection.password": "PASSWORD",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id_sap_incoming",
    "auto.create": "true",
    "auto.evolve": "true",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Thanks.

like image 465
Luis Estrada Avatar asked Dec 13 '25 22:12

Luis Estrada


2 Answers

As of the current version of Confluent Control Center, it does not support sending messages in Avro format directly through its UI. The Control Center is primarily designed for managing and monitoring your Kafka environment.

For producing Avro messages, you would typically use other tools provided by Confluent, such as the Confluent REST Proxy or the Kafka Avro Console Producer. Both of these tools allow you to send Avro messages to Kafka, but they require that your Avro schema is registered with the Confluent Schema Registry.

like image 196
pkonopacki Avatar answered Dec 16 '25 23:12

pkonopacki


Answer from 2024:

I encountered a similar situation recently and found a convenient way to send Avro format messages using the Confluent CLI.

Firstly you can follow this document to install confluent CLI, then configure your api key and secret:

confluent api-key store your-api-key your-api-secret --resource kafka-cluster-id
confluent api-key use your-api-key
  • Replace your-api-key and your-api-secret with the generated values.
  • Replace kafka-cluster-id with your Kafka cluster ID.

In your Kafka topic tab, go to the Schema tab to find the schema ID(usually a number).

Then you can just write the message in standard JSON format, like:

{"registertime": 1715086531577, "userid": "someid", "regionid": "UK", "gender": "FEMALE"}

To send this message, you just need to run the following command:

echo '{"registertime": 1715086531577, "userid": "someid", "regionid": "UK", "gender": "FEMALE"}' | \
confluent kafka topic produce your_topic --value-format avro --schema 123456
  • Replace your_topic with your Kafka topic name
  • Replace 123456 with your schema id

If everything runs alright, you'll see the message: Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit.

Now the message in Avro format should appear in your Kafka topic

like image 25
Sen Lin Avatar answered Dec 16 '25 22:12

Sen Lin



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!