I have error with my kafka producer. I use Debezium Kafka connectors V1.1.0 Final and Kafka 2.4.1 . For tables with pk, all tables are flushed clearly, but unfortunately for tables with no pk on it, it give me this error:
[2020-04-14 10:00:00,096] INFO Exporting data from table 'public.table_0' (io.debezium.relational.RelationalSnapshotChangeEventSource:280)
[2020-04-14 10:00:00,097] INFO For table 'public.table_0' using select statement: 'SELECT * FROM "public"."table_0"' (io.debezium.relational.RelationalSnapshotChangeEventSource:287)
[2020-04-14 10:00:00,519] INFO Finished exporting 296 records for table 'public.table_0'; total duration '00:00:00.421' (io.debezium.relational.RelationalSnapshotChangeEventSource:330)
[2020-04-14 10:00:00,522] INFO Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:79)
[2020-04-14 10:00:00,523] INFO Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfo=source_info[server='postgres'db='xxx, lsn=38/C74913C0, txId=4511542, timestamp=2020-04-14T02:00:00.517Z, snapshot=FALSE, schema=public, table=table_0], partition={server=postgres}, lastSnapshotRecord=true]] (io.debezium.pipeline.ChangeEventSourceCoordinator:90)
[2020-04-14 10:00:00,524] INFO Connected metrics set to 'true' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:59)
[2020-04-14 10:00:00,526] INFO Starting streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:100)
[2020-04-14 10:00:00,550] ERROR WorkerSourceTask{id=pg_dev_pinjammodal-0} failed to send record to table_0: (org.apache.kafka.connect.runtime.WorkerSourceTask:347)
org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence be rejected.
I have check the tables and it seem valid record. I set my producer producer.ack=1
in my config. Is this config trigger the invalidity in here?
The problem was creating Kafka topics with log compaction for non-PK tables, which need keys. The messages don't have keys, because the tables don't have PKs. This results in the brokers not being able to validate the Kafka messages.
The solution is to not set log compaction to the topics and/or not pre-creating those topics. Another option would be to add PKs to the tables.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With