Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to transform and extract fields in Kafka sink JDBC connector

I am using a 3rd party CDC tool that replicates data from a source database into Kafka topics. An example row is shown below:

{  
   "data":{  
      "USER_ID":{  
         "string":"1"
      },
      "USER_CATEGORY":{  
         "string":"A"
      }
   },
   "beforeData":{  
      "Data":{  
         "USER_ID":{  
            "string":"1"
         },
         "USER_CATEGORY":{  
            "string":"B"
         }
      }
   },
   "headers":{  
      "operation":"UPDATE",
      "timestamp":"2018-05-03T13:53:43.000"
   }
}

What configuration is needed in the sink file in order to extract all the (sub)fields under data and headers and ignore those under beforeData so that the target table in which the data will be transferred by Kafka Sink will contain the following fields:

USER_ID, USER_CATEGORY, operation, timestamp

I went through the transformation list in confluent's docs but I was not able to find how to use them in order to achieve the aforementioned target.

like image 915
Giorgos Myrianthous Avatar asked May 10 '18 19:05

Giorgos Myrianthous


People also ask

How does Kafka JDBC connector work?

The JDBC connector for Kafka Connect is included with Confluent Platform and can also be installed separately from Confluent Hub. It enables you to pull data (source) from a database into Kafka, and to push data (sink) from a Kafka topic to a database.

What is sink connector in Kafka?

The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka® topics to any relational database with a JDBC driver. This connector can support a wide variety of databases. The connector polls data from Kafka to write to the database based on the topics subscription.

What is JDBC sink connector?

JDBC sink connector enables you to export data from Kafka Topics into any relational database with a JDBC driver. You require the following before you use the JDBC Sink Connector. A database connection with JDBC Driver. An Event Hub Topic that is enabled with Kafka Connect. AVRO format.

What is transform in Kafka?

Single Message Transformations (SMTs) are applied to messages as they flow through Connect. SMTs transform inbound messages after a source connector has produced them, but before they are written to Kafka. SMTs transform outbound messages before they are sent to a sink connector.

What is the Kafka Connect sink connector?

JDBC Sink Connector for Confluent Platform¶ The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka® topics to any relational database with a JDBC driver. This connector can support a wide variety of databases. The connector polls data from Kafka to write to the database based on the topics subscription.

What is extract field in Kafka SMT?

The following provides usage information for the Apache Kafka® SMT org.apache.kafka.connect.transforms.ExtractField. ExtractField pulls a field out of a complex (non-primitive, Map or Struct) key or value and replaces the entire key or value with the extracted field.

How to push data from Kafka to RDBMS using JDBC?

In traditional enterprise IT landscapes, it is a common problem that you need to push data from Kafka directly into some RDBMS (like Oracle, Postgres, MySQL, MSSQL, …). One common and Kafka-native tool for this is the Kafka JDBC Sink Connector.

What happens when Kafka-Connect tries to do its own data conversion?

If the kafka-connect runtime tries to do its own data conversion then the resulting byte array sent to the collector is no longer the untainted data sent originally by the producer. No matter what data type your records are (Avro, Protobuf, etc) ALWAYS choose the ByteArray converter and let the collector resolve the data with its own schemas.


Video Answer


2 Answers

I think you want ExtractField, and unfortunately, it's a Map.get operation, so that means 1) nested fields cannot be gotten in one pass 2) multiple fields need multiple transforms.

That being said, you might to attempt this (untested)

transforms=ExtractData,ExtractHeaders
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractData.field=data
transforms.ExtractHeaders.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractHeaders.field=headers

If that doesn't work, you might be better off implementing your own Transformations package that can at least drop values from the Struct / Map.

like image 67
OneCricketeer Avatar answered Oct 16 '22 18:10

OneCricketeer


If you're willing to list specific field names, you can solve this by:

  1. Using a Flatten transform to collapse the nesting (which will convert the original structure's paths into dot-delimited names)
  2. Using a Replace transform with rename to make the field names be what you want the sink to emit
  3. Using another Replace transform with whitelist to limit the emitted fields to those you select

For your case it might look like:

  "transforms": "t1,t2,t3",
  "transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
  "transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",
like image 44
Marty Woodlee Avatar answered Oct 16 '22 19:10

Marty Woodlee