I have an already running production deployed Kafka-Cluster and having Topic "existing-topic". I am using MongoDB-Source-Connector from Debezium.
Here all what I want is to push the CDC events directly to the topic "existing-topic" so that my consumers which are already listening to that topic will process it.
I didn't find any resource to do it so, however it's mentioned that topic is created in below format -
"If your mongodb.name parameter is A, database name is B and collection name is C, the data from database A and collection C will be loaded under the topic A.B.C"
Can I change the topic to "existing-topic" and push the events to it?
According to the documentation,
The name of the Kafka topics always takes the form
logicalName.databaseName.collectionName
, wherelogicalName
is the logical name of the connector as specified with themongodb.name
configuration property,databaseName
is the name of the database where the operation occurred, andcollectionName
is the name of the MongoDB collection in which the affected document existed.
This means that if your connector's logical name is myConnector
and your database myDatabase
has two collections users
and orders
{
"name": "myConnector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongo-db-host:27017",
"mongodb.name": "myDatabase",
"collection.whitelist": "myDatabase[.]*",
}
}
then Kafka Connect will populate two topics with names:
myConnector.myDatabase.users
myConnector.myDatabase.orders
Now if you still want to change the name of the target topic, you can make use of Kafka Connect Single Message Transforms (SMT). More precisely, ExtractTopic
should help you. Note though that this SMT helps you extract the topic name from the key or value of the message, therefore you somehow need to include the desired topic name in the payload.
For example, the following SMT will extract the value of field myField
and use this as the record's topic:
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=myField
I was facing the same problem with the JDBC Source Connector and found a different solution:
Using the RegexRouter
Single Message Transform with dropPrefix
you can just override the whole topic name:
"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"A.B.C", // whole created topic name
"transforms.dropPrefix.replacement":"existing-topic" // whole exisiting topic name
And it works with regex, so if you're using multiple tables/collections and your created topic name isn't constant, you should be able to make it dynamic.
It's a bit hacky as technically I'm dropping the whole topic name and then adding a new topic name - which isn't the best solution, to me anyway.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With