I am using a 3rd party CDC tool that replicates data from a source database into Kafka topics. An example row is shown below:
{
"data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"A"
}
},
"beforeData":{
"Data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"B"
}
}
},
"headers":{
"operation":"UPDATE",
"timestamp":"2018-05-03T13:53:43.000"
}
}
What configuration is needed in the sink file in order to extract all the (sub)fields under data
and headers
and ignore those under beforeData
so that the target table in which the data will be transferred by Kafka Sink will contain the following fields:
USER_ID, USER_CATEGORY, operation, timestamp
I went through the transformation list in confluent's docs but I was not able to find how to use them in order to achieve the aforementioned target.
The JDBC connector for Kafka Connect is included with Confluent Platform and can also be installed separately from Confluent Hub. It enables you to pull data (source) from a database into Kafka, and to push data (sink) from a Kafka topic to a database.
The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka® topics to any relational database with a JDBC driver. This connector can support a wide variety of databases. The connector polls data from Kafka to write to the database based on the topics subscription.
JDBC sink connector enables you to export data from Kafka Topics into any relational database with a JDBC driver. You require the following before you use the JDBC Sink Connector. A database connection with JDBC Driver. An Event Hub Topic that is enabled with Kafka Connect. AVRO format.
Single Message Transformations (SMTs) are applied to messages as they flow through Connect. SMTs transform inbound messages after a source connector has produced them, but before they are written to Kafka. SMTs transform outbound messages before they are sent to a sink connector.
JDBC Sink Connector for Confluent Platform¶ The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka® topics to any relational database with a JDBC driver. This connector can support a wide variety of databases. The connector polls data from Kafka to write to the database based on the topics subscription.
The following provides usage information for the Apache Kafka® SMT org.apache.kafka.connect.transforms.ExtractField. ExtractField pulls a field out of a complex (non-primitive, Map or Struct) key or value and replaces the entire key or value with the extracted field.
In traditional enterprise IT landscapes, it is a common problem that you need to push data from Kafka directly into some RDBMS (like Oracle, Postgres, MySQL, MSSQL, …). One common and Kafka-native tool for this is the Kafka JDBC Sink Connector.
If the kafka-connect runtime tries to do its own data conversion then the resulting byte array sent to the collector is no longer the untainted data sent originally by the producer. No matter what data type your records are (Avro, Protobuf, etc) ALWAYS choose the ByteArray converter and let the collector resolve the data with its own schemas.
I think you want ExtractField
, and unfortunately, it's a Map.get
operation, so that means 1) nested fields cannot be gotten in one pass 2) multiple fields need multiple transforms.
That being said, you might to attempt this (untested)
transforms=ExtractData,ExtractHeaders
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractData.field=data
transforms.ExtractHeaders.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractHeaders.field=headers
If that doesn't work, you might be better off implementing your own Transformations package that can at least drop values from the Struct / Map.
If you're willing to list specific field names, you can solve this by:
rename
to make the field names be what you want the sink to emitwhitelist
to limit the emitted fields to those you selectFor your case it might look like:
"transforms": "t1,t2,t3",
"transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
"transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With