I have setup a dockerized cluster of Kafka Connect which is running in distributed mode. I am trying to setup a Kafka JDBC Source Connector to move data between Microsoft SQL Server and Kafka.
Below is the output of the response of my connector-plugins
api
[
{
class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
type: "sink",
version: "4.0.0"
},
{
class: "io.confluent.connect.hdfs.HdfsSinkConnector",
type: "sink",
version: "4.0.0"
},
{
class: "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
type: "source",
version: "1.0.0-cp1"
},
{
class: "io.confluent.connect.jdbc.JdbcSinkConnector",
type: "sink",
version: "4.0.0"
},
{
class: "io.confluent.connect.jdbc.JdbcSourceConnector",
type: "source",
version: "4.0.0"
},
{
class: "io.debezium.connector.mongodb.MongoDbConnector",
type: "source",
version: "0.7.4"
},
{
class: "io.debezium.connector.mysql.MySqlConnector",
type: "source",
version: "0.7.4"
},
{
class: "org.apache.kafka.connect.file.FileStreamSinkConnector",
type: "sink",
version: "1.0.0-cp1"
},
{
class: "org.apache.kafka.connect.file.FileStreamSourceConnector",
type: "source",
version: "1.0.0-cp1"
}
]
I have already added the JDBC Driver provided my Microsoft SQL Server
to my plugins path
in my Kafka Connect Cluster.
Below is the input to my connectors
api,
curl -X POST \
http://kafka-connect-cluster.com/connectors \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-d '{
"name": "mssql-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp",
"timestamp.column.name": "updateTimeStamp",
"query": "select * from table_name",
"tasks.max": "1",
"table.types": "TABLE",
"key.converter.schemas.enable": "false",
"topic.prefix": "data_",
"value.converter.schemas.enable": "false",
"connection.url": "jdbc:sqlserver://<host>:<port>;databaseName=<dbName>;",
"connection.user": "<username>",
"connection.password": "<password>",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"poll.interval.ms": "5000",
"table.poll.interval.ms": "120000"
}
}'
The error that i get while trying this query is as follows:
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:sqlserver://<host>:<port>;databaseName=<db_name>; for configuration Couldn't open connection to jdbc:sqlserver://<host>:<port>;databaseName=<db_name>;\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:sqlserver://<host>:<port>;databaseName=<db_name;> for configuration Couldn't open connection to jdbc:sqlserver://<host>:<port>;databaseName=<db_name;>\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}
Any help you can provide is highly appreciated.
Thanks
Credit to the answer goes to @rmoff for pointing me in the right direction.
So the issue lied in two places.
CONNECT_PLUGIN_PATH
. There is
nothing wrong with doing that, but its generally not a good idea
because you will have to copy all the base plugins available with
the confluent platform, this can create a problem when move to a new
version as you might have to go through the same process again.kafka-connect-jdbc-<confluent-version>.jar
which in my case is
kafka-connect-jdbc-4.0.0.jar
.Once these two points were addressed my SQLServer JDBC Driver started working as expected.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With