How do I retrieve incoming headers from the kafka message with Kafka Connect to store them as additional data fields with MongoDB Sink Connector to mongodb.
I have a kafka topic "PROJECT_EXAMPLE_TOPIC". As you see I am already able to save msg timestamp, incoming message data and mongo document created/updated dates.
I guess there is a function to extract header somewhere.
Example kafka value
// incoming kafka value
{
"msgId" : "exampleId"
}
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" : "exampleId",
"message_header_foo" : "header_foo_value"
}
//expected example
{
"_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
"_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
"message_source" : "mongo_connector",
"message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
"message_topic" : "PROJECT_EXAMPLE_TOPIC",
"msgId" : "exampleId",
"message_headers" : {
"header_001" : "header_001_value",
"header_002" : "header_002_value",
...
"header_x" : "header_x_value"
}
}
There is my configuration
{
"name": "sink-mongo-PROJECT-EXAMPLE",
"config": {
"topics": "PROJECT_EXAMPLE_TOPIC",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url": "SCHEMA_REGISTRY_URL",
"key.converter.schemas.enable": "false",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "SCHEMA_REGISTRY_URL",
"value.converter.schemas.enable": "false",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET",
"connection.uri": "PROJECT_REFERENTIAL_MONGO_URL",
"database": "PROJECT_DB_NAME",
"collection": "EXAMPLE",
"max.num.retries": "3",
"retries.defer.timeout": "5000",
"key.projection.type": "none",
"key.projection.list": "",
"field.renamer.mapping": "[]",
"field.renamer.regex": "[]",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder",
"value.projection.list": "msgId",
"value.projection.type": "whitelist",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
"delete.on.null.values": "false",
"max.batch.size": "0",
"rate.limiting.timeout": "0",
"rate.limiting.every.n": "0",
"change.data.capture.handler": "",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true,
"transforms": "InsertSource,InsertTopic,InsertTimestamp",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "message_source",
"transforms.InsertSource.static.value": "mongo_connector",
"transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field": "message_topic",
"transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTimestamp.timestamp.field": "message_timestamp"
}
}
This is a bit of an old question, but there is a 3rd party message transform that can convert headers to fields on either key or value
https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-common/transformations/HeaderToField.html
This won't allow you to grab all headers though, you need to specify by name the ones you want to extract and their type.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With