I have a running debezium cluster in AWS, no issues with that. I want to give a try with AWS MSK. So I launched a cluster. Then I launched an EC2 for running my connectors.
Then installed confluent-kafka
sudo apt-get update && sudo apt-get install confluent-platform-2.12
By default the AWS MSK doesn't have schema registry, So I configured it from the connector EC2 Schema registry conf file:
kafkastore.connection.url=z-1.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-3.bhuvi-XXXXXXXXX.amazonaws.com:2181,z-2.bhuvi-XXXXXXXXX.amazonaws.com:2181
kafkastore.bootstrap.servers=PLAINTEXT://b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,PLAINTEXT://b-1.bhuvi-XXXXXXXXX.amazonaws.com:9092
Then /etc/kafka/connect-distributed.properties
file
bootstrap.servers=b-4.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-3.bhuvi-XXXXXXXXX.amazonaws.com:9092,b-2.bhuvi-XXXXXXXXX.amazonaws.com:9092
plugin.path=/usr/share/java,/usr/share/confluent-hub-components
confluent-hub install debezium/debezium-connector-mysql:latest
systemctl start confluent-schema-registry
systemctl start confluent-connect-distributed
Now everything started. Then I created a mysql.json file.
{
"name": "mysql-connector-db01",
"config": {
"name": "mysql-connector-db01",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.server.id": "1",
"tasks.max": "3",
"database.history.kafka.bootstrap.servers": "172.31.47.152:9092,172.31.38.158:9092,172.31.46.207:9092",
"database.history.kafka.topic": "schema-changes.mysql",
"database.server.name": "mysql-db01",
"database.hostname": "172.31.84.129",
"database.port": "3306",
"database.user": "bhuvi",
"database.password": "my_stong_password",
"database.whitelist": "proddb,test",
"internal.key.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
"transforms.unwrap.add.source.fields": "ts_ms",
}
}
curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d @mysql.josn
Then its stated giving this error in the connector EC2.
Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,290] WARN [Producer clientId=producer-3] Got error produce response with correlation id 844 on topic-partition connect-configs-0, retrying (2147482809 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,391] WARN [Producer clientId=producer-3] Got error produce response with correlation id 845 on topic-partition connect-configs-0, retrying (2147482808 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,492] WARN [Producer clientId=producer-3] Got error produce response with correlation id 846 on topic-partition connect-configs-0, retrying (2147482807 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
Dec 20 11:42:36 ip-172-31-44-220 connect-distributed[2630]: [2019-12-20 11:42:36,593] WARN [Producer clientId=producer-3] Got error produce response with correlation id 847 on topic-partition connect-configs-0, retrying (2147482806 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender:637)
It never stops this error message.
Describe of connect-configs
Topic:connect-configs PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: connect-configs Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors. Each of the connectors works with a specific database management system (DBMS).
Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases.
What is Debezium? Debezium is a distributed platform that turns your existing databases into event streams, so applications can quickly react to each row-level change in the databases. Debezium is built on top of Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems.
MSK sets min.in.sync.replicas
to 2 for all topics by default (see https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html)
It possible that Kafka Connect is producing using ACKs="all" and, since you only have one copy of your topic, it never achieves enough quorum.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With