Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Connect JDBC sink connector not working

I am trying to use Kafka Connect JDBC sink connector to insert data into Oracle but it is throwing an error . I have tried with all the possible configurations of the schema. Below is the examples .

Please suggest if I am missing anything below are my configurations files and errors.

Case 1- First Configuration

internal.value.converter.schemas.enable=false .

so I am getting the

[2017-08-28 16:16:26,119] INFO Sink task WorkerSinkTask{id=oracle_sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)

[2017-08-28 16:16:26,606] INFO Discovered coordinator dfw-appblx097-01.prod.walmart.com:9092 (id: 2147483647 rack: null) for group connect-oracle_sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)

[2017-08-28 16:16:26,608] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)

[2017-08-28 16:16:26,609] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:16:27,174] INFO Successfully joined group connect-oracle_sink with generation 26 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)

[2017-08-28 16:16:27,176] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)

[2017-08-28 16:16:28,580] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)

org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DJ

   at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:190)

   at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)

   at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)

   at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)

   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)

   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)

   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)

   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

   at java.util.concurrent.FutureTask.run(FutureTask.java:266)

   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

   at java.lang.Thread.run(Thread.java:748)

2nd Configuration -

internal.key.converter.schemas.enable=true

internal.value.converter.schemas.enable=true

Log:

[2017-08-28 16:23:50,993] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)

[2017-08-28 16:23:50,993] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:23:51,260] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:23:51,381] INFO Successfully joined group connect-oracle_sink with generation 29 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)

[2017-08-28 16:23:51,384] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)

[2017-08-28 16:23:51,727] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

   at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)

Oracle connector.properties looks like

name=oracle_sink

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

tasks.max=1

topics=DJ

connection.url=jdbc:oracle:thin:@hostname:port:sid

connection.user=username

connection.password=password

#key.converter=org.apache.kafka.connect.json.JsonConverter

#value.converter=org.apache.kafka.connect.json.JsonConverter

auto.create=true

auto.evolve=true

Connect-Standalone.properties

My JSON looks like -

{"Item":"12","Sourcing Reason":"corr","Postal Code":"l45","OrderNum":"10023","Intended Node Distance":1125.8,"Chosen Node":"34556","Quantity":1,"Order Date":1503808765201,"Intended Node":"001","Chosen Node Distance":315.8,"Sourcing Logic":"reducesplits"}
like image 705
Digvijay Waghela Avatar asked Aug 28 '17 23:08

Digvijay Waghela


People also ask

How does Kafka JDBC connector work?

The JDBC connector for Kafka Connect is included with Confluent Platform and can also be installed separately from Confluent Hub. It enables you to pull data (source) from a database into Kafka, and to push data (sink) from a Kafka topic to a database.

What is a Kafka sink connector?

The MongoDB Kafka sink connector is a Kafka Connect connector that reads data from Apache Kafka and writes data to MongoDB.

What is JDBC sink connector?

The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka® topics to any relational database with a JDBC driver. This connector can support a wide variety of databases. The connector polls data from Kafka to write to the database based on the topics subscription.

How do I add connectors to Kafka connect?

Install Connector Manually Kafka Connect isolates each plugin so that the plugin libraries do not conflict with each other. To manually install a connector: Find your connector on Confluent Hub and download the connector ZIP file. Extract the ZIP file contents and copy the contents to the desired location.


2 Answers

Per the documentation

The sink connector requires knowledge of schemas, so you should use a suitable converter e.g. the Avro converter that comes with the schema registry, or the JSON converter with schemas enabled.

So if your data is JSON you would have the following configuration:

[...]
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
[...]

The error you see in the second instance is pertinent -- JsonConverter with schemas.enable requires "schema" and "payload" fields - the JSON you share does not meet this required format.

Here's a simple example of a valid JSON message with schema and payload :

{
    "schema": {
        "type": "struct",
        "fields": [{
            "type": "int32",
            "optional": true,
            "field": "c1"
        }, {
            "type": "string",
            "optional": true,
            "field": "c2"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "create_ts"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "update_ts"
        }],
        "optional": false,
        "name": "foobar"
    },
    "payload": {
        "c1": 10000,
        "c2": "bar",
        "create_ts": 1501834166000,
        "update_ts": 1501834166000
    }
}

What's your source for the data that you're trying to land to Oracle? If it's Kafka Connect inbound then you simply use the same converter configuration (Avro + Confluent Schema Registry) would be easier and more efficient. If it's a custom application, you'll need to get it to either (a) use the Confluent Avro serialiser or (b) write the JSON in the required format above, providing the schema of the payload inline with the message.

like image 100
Robin Moffatt Avatar answered Oct 12 '22 06:10

Robin Moffatt


I've the same problem, after the read this post. I has been resolved with JDBC Sink MySQL Below my Kafka Connect Configuration, as additional information:

curl --location --request POST 'http://localhost:8083/connectors/' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "ttib-transactions",
        "connection.url": "jdbc:mysql://172.17.0.1:6603/tt-tran?verifyServerCertificate=true&useSSL=false",
        "connection.user": "root",
        "connection.password": "*******",
        "value.converter.schema.registry.url": "https://psrc-j55zm.us-central1.gcp.confluent.cloud",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "insert.mode": "insert",
        "batch.size":"0",
        "table.name.format": "${topic}",
        "pk.fields" :"id"
    }
}'
like image 34
dmotta Avatar answered Oct 12 '22 07:10

dmotta