I have the following json on a topic that the JDBC connector publishes to
{"APP_SETTING_ID":9,"APP_SETTING_NAME":"my_name","SETTING_KEY":"my_setting_key"}
Here's my connector file
name=data.app_setting
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
poll.interval.ms=500
tasks.max=4
mode=timestamp
query=SELECT APP_SETTING_ID, APP_SETTING_NAME, SETTING_KEY,FROM MY_TABLE with (nolock)
timestamp.column.name=LAST_MOD_DATE
topic.prefix=data.app_setting
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=APP_SETTING_ID
This does add a key but its also a json format like
{"APP_SETTING_ID":9}
While I just want 9 to be the key instead of the map. In the database it's stored as a Long value.
ValueToKey
takes a list of fields within the value, and returns a mapping of those fields to their values.
I think you need a second transform to extract only a single one of those fields.
transforms=ReplaceKey,ExtractKey
# Replaces the key with fields in the value. Creates a map for all listed fields
transforms.ReplaceKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.ReplaceKey.fields=APP_SETTING_ID
# Extracts a specfic field from the key, assuming it's a map/struct type
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractKey.field=APP_SETTING_ID
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With