I use curl to submit to our Kafka Connect service a JSON request message with information about the connector, it is working successfully.
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ \"name\": \"inventory-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\", \"tasks.max\": \"1\", \"database.hostname\": \"mysql\", \"database.port\": \"3306\", \"database.user\": \"debezium\", \"database.password\": \"dbz\", \"database.server.id\": \"184054\", \"database.server.name\": \"dbserver1\", \"database.whitelist\": \"inventory\", \"database.history.kafka.bootstrap.servers\": \"kafka:9092\", \"database.history.kafka.topic\": \"dbhistory.inventory\" } }'
now I am using node.js server to send data to kafka connect server.
var body = {
"name": "abc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
};
var options = {
method: 'PUT',
uri: 'http://localhost/connectors/abc/config',
headers: {
'User-Agent': 'Request-Promise'
},
json: true ,
body: body
};
rp(options)
.then(function (data) {
return res.status(200).json({ 'data': data});
})
.catch(function (err) {
console.log(err);
return res.status(500).json({ error: err});
});
however the code throw out an error: saying
{ StatusCodeError: 500 - {"error_code":500,"message":"Cannot deserialize instance of `java.lang.String` out of START_OBJECT token\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 42] (through reference chain: java.util.LinkedHashMap[\"config\"])"}
The API description is from https://docs.confluent.io/current/connect/references/restapi.html

If I read the confluent doc correctly, you did mix up two different API endpoints.
In your code, you use the endpoint /connectors/abc/config, which according to the documentation takes a single config object as toplevel, so like this:
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
But your JSON object looks like it was meant for the /connectors endpoint.
Changing either the endpoint or your JSON object to match the endpoint you have chosen may fix the problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With