I have setup a dockerized cluster of Kafka Connect which is running in distributed mode. I am trying to setup a Kafka JDBC Source Connector to move data between Microsoft SQL Server and Kafka.
Below is the output of the response of my connector-plugins api
[
    {
    class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    type: "sink",
    version: "4.0.0"
    },
    {
    class: "io.confluent.connect.hdfs.HdfsSinkConnector",
    type: "sink",
    version: "4.0.0"
    },
    {
    class: "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
    type: "source",
    version: "1.0.0-cp1"
    },
    {
    class: "io.confluent.connect.jdbc.JdbcSinkConnector",
    type: "sink",
    version: "4.0.0"
    },
    {
    class: "io.confluent.connect.jdbc.JdbcSourceConnector",
    type: "source",
    version: "4.0.0"
    },
    {
    class: "io.debezium.connector.mongodb.MongoDbConnector",
    type: "source",
    version: "0.7.4"
    },
    {
    class: "io.debezium.connector.mysql.MySqlConnector",
    type: "source",
    version: "0.7.4"
    },
    {
    class: "org.apache.kafka.connect.file.FileStreamSinkConnector",
    type: "sink",
    version: "1.0.0-cp1"
    },
    {
    class: "org.apache.kafka.connect.file.FileStreamSourceConnector",
    type: "source",
    version: "1.0.0-cp1"
    }
]
I have already added the JDBC Driver provided my Microsoft SQL Server to my plugins path in my Kafka Connect Cluster.
Below is the input to my connectors api,
curl -X POST \
  http://kafka-connect-cluster.com/connectors \
  -H 'Content-Type: application/json' \
  -H 'Accept: application/json' \
  -d '{
"name": "mssql-source-connector",
"config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "mode": "timestamp",
        "timestamp.column.name": "updateTimeStamp",
        "query": "select * from table_name",
        "tasks.max": "1",
        "table.types": "TABLE",
        "key.converter.schemas.enable": "false",
        "topic.prefix": "data_",
        "value.converter.schemas.enable": "false",
        "connection.url": "jdbc:sqlserver://<host>:<port>;databaseName=<dbName>;",
        "connection.user": "<username>",
        "connection.password": "<password>",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "poll.interval.ms": "5000",
        "table.poll.interval.ms": "120000"
    }
}'
The error that i get while trying this query is as follows:
{
    "error_code": 400,
    "message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:sqlserver://<host>:<port>;databaseName=<db_name>; for configuration Couldn't open connection to jdbc:sqlserver://<host>:<port>;databaseName=<db_name>;\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:sqlserver://<host>:<port>;databaseName=<db_name;> for configuration Couldn't open connection to jdbc:sqlserver://<host>:<port>;databaseName=<db_name;>\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}
Any help you can provide is highly appreciated.
Thanks
Credit to the answer goes to @rmoff for pointing me in the right direction.
So the issue lied in two places.
CONNECT_PLUGIN_PATH. There is
nothing wrong with doing that, but its generally not a good idea
because you will have to copy all the base plugins available with
the confluent platform, this can create a problem when move to a new
version as you might have to go through the same process again.kafka-connect-jdbc-<confluent-version>.jar which in my case is
kafka-connect-jdbc-4.0.0.jar.Once these two points were addressed my SQLServer JDBC Driver started working as expected.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With