Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get kafka message's headers in Kafka Connect Sink connector with MongoDB

How do I retrieve incoming headers from the kafka message with Kafka Connect to store them as additional data fields with MongoDB Sink Connector to mongodb.

I have a kafka topic "PROJECT_EXAMPLE_TOPIC". As you see I am already able to save msg timestamp, incoming message data and mongo document created/updated dates.

I guess there is a function to extract header somewhere.

Example kafka value

  // incoming kafka value
  {
    "msgId" : "exampleId"
  }
  1. How to get original header header_foo ?

  //expected example
  {
  
    "_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
    "_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "message_source" : "mongo_connector",
    "message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
    "message_topic" : "PROJECT_EXAMPLE_TOPIC",
    "msgId" : "exampleId",
    "message_header_foo" : "header_foo_value"
   }


  1. how to get all kafka headers ?
  //expected example
  {
    "_id" : ObjectId("5f83869c1ad2db246fa25a5a"),
    "_insertedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "_modifiedTS" : ISODate("2020-10-11T22:26:36.051Z"),
    "message_source" : "mongo_connector",
    "message_timestamp" : ISODate("2020-09-28T21:50:54.940Z"),
    "message_topic" : "PROJECT_EXAMPLE_TOPIC",
    "msgId" : "exampleId",
    "message_headers" : {
        "header_001" : "header_001_value",
        "header_002" : "header_002_value",
        ...
        "header_x" : "header_x_value"
    }
  }


There is my configuration

{
    "name": "sink-mongo-PROJECT-EXAMPLE",
    "config": {
      "topics": "PROJECT_EXAMPLE_TOPIC",
      "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
      "tasks.max": "1",
  
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schema.registry.url": "SCHEMA_REGISTRY_URL",
      "key.converter.schemas.enable": "false",
      "key.converter.basic.auth.credentials.source": "USER_INFO",
      "key.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET",
  
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "SCHEMA_REGISTRY_URL",
      "value.converter.schemas.enable": "false",
      "value.converter.basic.auth.credentials.source": "USER_INFO",
      "value.converter.basic.auth.user.info": "SCHEMA_REGISTRY_API_KEY_AND_SECRET",
      "connection.uri": "PROJECT_REFERENTIAL_MONGO_URL",
      "database": "PROJECT_DB_NAME",
      "collection": "EXAMPLE",
      "max.num.retries": "3",
      "retries.defer.timeout": "5000",
  
  
      "key.projection.type": "none",
      "key.projection.list": "",
  
      "field.renamer.mapping": "[]",
      "field.renamer.regex": "[]",
  
      "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
      "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder",
      "value.projection.list": "msgId",
      "value.projection.type": "whitelist",
      "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
    
      "delete.on.null.values": "false",
    
      "max.batch.size": "0",
      "rate.limiting.timeout": "0",
      "rate.limiting.every.n": "0",
    
    
      "change.data.capture.handler": "",
  
      "errors.tolerance": "all",
      "errors.log.enable":true,
      "errors.log.include.messages":true,

      "transforms": "InsertSource,InsertTopic,InsertTimestamp",
      "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.InsertSource.static.field": "message_source",
      "transforms.InsertSource.static.value": "mongo_connector",
      "transforms.InsertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.InsertTopic.topic.field": "message_topic",
      "transforms.InsertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.InsertTimestamp.timestamp.field": "message_timestamp"

    }
  }
like image 995
linuxidefix Avatar asked Nov 07 '22 04:11

linuxidefix


1 Answers

This is a bit of an old question, but there is a 3rd party message transform that can convert headers to fields on either key or value

https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-common/transformations/HeaderToField.html

This won't allow you to grab all headers though, you need to specify by name the ones you want to extract and their type.

like image 104
Matt Allwood Avatar answered Nov 15 '22 11:11

Matt Allwood