Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to update configuration of a running kafka connector

I have a running Kafka debezium connector and now I want to update the values of the following parameters,

heartbeat.interval.ms
snapshot.mode

Current Config:

{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "snapshot.locking.mode": "minimal",
  "database.user": "cdc_user",
  "tasks.max": "3",
  "database.history.kafka.bootstrap.servers": "XXX:9092",
  "database.history.kafka.topic": "history.cdc.fkw.supply.marketplace.fk_sp_generic_checklist",
  "database.server.name": "cdc.fkw.supply.marketplace.fk_sp_generic_checklist",
  "heartbeat.interval.ms": "5000",
  "database.port": "3306",
  "table.whitelist": "fk_sp_generic_checklist.entity_checklist",
  "database.hostname": "xyzcloud.in",
  "database.password": "ACCSSDD",
  "database.history.kafka.recovery.poll.interval.ms": "5000",
  "name": "cdc.fkw.supply.marketplace1.fk_sp_generic_checklist.connector",
  "database.history.skip.unparseable.ddl": "true",
  "errors.tolerance": "all",
  "database.whitelist": "fk_sp_generic_checklist",
  "snapshot.mode": "when_needed"
}
This is what is get:

curl --location --request PUT 'http://XX.XX.7/connectors/' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "cdc.fkw.supply.marketplace.fk_sp_generic_checklist.connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "abc.cloud.in",
        "database.port": "3306",
        "database.user": "XXXXX",
        "database.password": "XXXXX",
        "database.server.name": "cdc.fkw.supply.marketplace.fk_sp_generic_checklist",
        "database.whitelist": "pno",
        "table.whitelist": "fk_sp_generic_checklist.entity_checklist",
        "database.history.kafka.bootstrap.servers": "X9:9092",
        "database.history.kafka.topic": "history.cdc.fkw.supply.marketplace.fk_sp_generic_checklist",
        "include.schema.changes": "false",
        "snapshot.mode": "when_needed",
        "errors.tolerance": "all"
    }
}'

Output: {"error_code":405,"message":"HTTP 405 Method Not Allowed"}

I tried the above mentioned but it gives me method not allowed.

Version:

{"version":"6.1.1-ccs","commit":"c209f70c6c2e52ae","kafka_cluster_id":"snBlf-kfTdCYWEO9IIEXTA"}%
like image 643
Shubham Saroj Avatar asked Sep 13 '25 19:09

Shubham Saroj


1 Answers

To update the configuration for an existing connector you can use PUT /connectors/(string:name)/config method of Connect REST Interface.

curl -XPUT <host>:<port>/connectors/cdc.fkw.supply.marketplace1.fk_sp_generic_checklist.connector/config \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' -d '{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "snapshot.locking.mode": "minimal",
  "database.user": "cdc_user",
  "tasks.max": "3",
  "database.history.kafka.bootstrap.servers": "XXX:9092",
  "database.history.kafka.topic": "history.cdc.fkw.supply.marketplace.fk_sp_generic_checklist",
  "database.server.name": "cdc.fkw.supply.marketplace.fk_sp_generic_checklist",
  "heartbeat.interval.ms": "5000",
  "database.port": "3306",
  "table.whitelist": "fk_sp_generic_checklist.entity_checklist",
  "database.hostname": "xyzcloud.in",
  "database.password": "ACCSSDD",
  "database.history.kafka.recovery.poll.interval.ms": "5000",
  "name": "cdc.fkw.supply.marketplace1.fk_sp_generic_checklist.connector",
  "database.history.skip.unparseable.ddl": "true",
  "errors.tolerance": "all",
  "database.whitelist": "fk_sp_generic_checklist",
  "snapshot.mode": "when_needed"
}'
like image 93
Iskuskov Alexander Avatar answered Sep 15 '25 12:09

Iskuskov Alexander