Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Debezium Postgres sink connector fails to insert values with type DATE

After setting up both source and sink connectors, I get problems with DATE type Postgres columns.

ERROR: column "foo" is of type date but expression is of type integer

I checked Avro schema and see that column foo was serialized as io.debezium.time.Date

{
    "default": null,
    "name": "foo",
    "type": [
        "null",
        {
            "connect.name": "io.debezium.time.Date",
            "connect.version": 1,
            "type": "int"
        }
    ]
}

What should I do to let sink connector insert this values correctly (as DATE, not INTEGER)?

Full stacktrace:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    ... 10 more
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249

    ... 12 more

Source config:

{
    "name": "dbz-source-test-1",
    "config": {
        "name":"dbz-source-test-1",
        "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname":"some.host",
        "database.port":"5432",
        "database.user":"test_debezium",
        "database.password":"password",
        "database.dbname":"dbname",
        "plugin.name":"wal2json_rds",
        "slot.name":"wal2json_rds",
        "database.server.name":"server_test",
        "table.whitelist":"public.test_table",
        "transforms":"route",
        "transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement":"dbz_source_$3",
        "topic.selection.strategy":"topic_per_table",
        "include.unknown.datatypes":true,
        "decimal.handling.mode":"double",
        "snapshot.mode":"never"
    }
}

Sink config:

{
    "name": "dbz-sink-test-1",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "config.providers" : "file",
        "config.providers.file.class" : "org.apache.kafka.common.config.provider.FileConfigProvider",
        "config.providers.file.param.secrets" : "/opt/mysecrets",
        "topics": "dbz_source_test_table",
        "connection.url": "someurl",
        "connection.user": "${file:/opt/mysecrets.properties:user}",
        "connection.password" : "${file:/opt/mysecrets.properties:pass}",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "table.name.format": "dbz_source_",
        "insert.mode": "upsert",
        "pk.field": "id",
        "pk.mode": "record_value"
    }
}
like image 940
bsiamionau Avatar asked Dec 13 '25 04:12

bsiamionau


1 Answers

I fixed problem switching source connector time.precision.mode config to connect

When the time.precision.mode configuration property is set to connect, then the connector will use the predefined Kafka Connect logical types. This may be useful when consumers only know about the built-in Kafka Connect logical types and are unable to handle variable-precision time values.

After it serialization type becomes different:

{
    "default": null,
    "name": "foo",
    "type": [
        "null",
        {
            "connect.name": "org.apache.kafka.connect.data.Date",
            "connect.version": 1,
            "logicalType": "date",
            "type": "int"
        }
    ]
}

Sink connector is aware of org.apache.kafka.connect.data.Date type and inserts in correctly.

like image 70
bsiamionau Avatar answered Dec 16 '25 01:12

bsiamionau



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!