Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Updating a Debezium MySQL connector with table whitelist option

I'm using the Debezium (0.7.5) MySQL connector and I'm trying to understand what is the best approach if I want to update this configuration with the option table.whitelist.

Let's say I create a connector, something like this:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://debezium-host/connectors/ -d '
{
  "name": "MyConnector",
  "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "connect.timeout.ms": "60000",
      "tasks.max": "1",
      "database.hostname": "myhost",
      "database.port": "3306",
      "database.user": "***",
      "database.password": "***",
      "database.server.id": "3227197",
      "database.server.name": "MyServer",
      "database.whitelist": "myDb",
      "table.whitelist": "myDb.table1,myDb.table2",
      "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
      "database.history.kafka.topic": "MyConnectorHistoryTopic",
      "max.batch.size": "1024",
      "snapshot.mode": "initial",
      "decimal.handling.mode": "double"
    }
}'

After some time (2 weeks), I need to add a new table (myDb.table3) to this table.whitelist option (and this table is an old one, it was created before the connector)

What I tried was:

  • Pause the connector.
  • Deleted the history topic (maybe this was the problem?).
  • Updated the config via API update config endpoint.
  • Resume the connector.

Update command via API:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" https://kafka-connect-host/connectors/MyConnector/config/ -d '
{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "connect.timeout.ms": "60000",
  "tasks.max": "1",
  "database.hostname": "myhost",
  "database.port": "3306",
  "database.user": "***",
  "database.password": "***",
  "database.server.id": "3227197",
  "database.server.name": "MyServer",
  "database.whitelist": "myDb",
  "table.whitelist": "myDb.table1,myDb.table2,myDb.table3",
  "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
  "database.history.kafka.topic": "MyConnectorHistoryTopic",
  "max.batch.size": "1024",
  "snapshot.mode": "schema_only",
  "decimal.handling.mode": "double"
}'

But it didn't work and maybe this isn't the best approach at all. In other connectors I'm not using the option table.whitelist, so when I needed to listen na new table, I didn't have this problem.

My last option, I think would be delete this connector and create another one with this new configuration also listening the new table (myDb.table3). The problem is if I want the initial data from myDb.table3 I would have to create with the snapshot initial but I don't wanna to generate all the messages from the snapshot from the other tables myDb.table1,myDb.table2.

like image 250
japoneizo Avatar asked Nov 28 '18 01:11

japoneizo


People also ask

How do you stop a Debezium connector?

Following Vitor's comment, you have to DELETE the connector via the Kafka Connect REST interface before doing Ctrl+C. I'll assume you're running standalone on localhost:8083 (the default). You should then delete this connector. After this, you can finally Ctrl+C.

Is Debezium a Kafka connector?

Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors. Each of the connectors works with a specific database management system (DBMS).


Video Answer


2 Answers

Changes to the whitelist/blacklist config are not yet supported at this point. This is currently being worked on (see DBZ-175), and we hope to have preview support for this in one of the next releases. There's a pending PR for this, which needs a bit more work, though.

Until this has been implemented, your best option is to set up a new instance of the connector which only captures the additional tables you're interested in. This comes at the price of running two connectors (which both will maintain a binlog reader session), but it does the trick as long as you don't need to change your filter config too often.

like image 146
Gunnar Avatar answered Oct 23 '22 03:10

Gunnar


The latest version of Debezium Server, you can add the following config

debezium.snapshot.new.tables=parallel

In case If you are using Debezium, you can try this config value

snapshot.new.tables=parallel

Note: Debeziyum Server is the one that supports Kinesis, Google Pub sub, and Apache Pulsar. I am using that and its configuration is a bit different. I had to prepend "debezium" before each item

Once this configuration is added, any addition to tables.whitelist, For these additional tables Debezium will create snapshots.

I cannot point you to the documentation but I went through their code in GitHub and also I tried it practically which worked for me. Here is the link to the MySqlConnector code

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java

There search for Field.create("snapshot.new.tables")

Personally, I feel like Debezium has a lot of things but documentation is scattered.

like image 24
Pavan Kumar Aryasomayajulu Avatar answered Oct 23 '22 05:10

Pavan Kumar Aryasomayajulu