Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Producer cannot validate record wihout PK and return InvalidRecordException

I have error with my kafka producer. I use Debezium Kafka connectors V1.1.0 Final and Kafka 2.4.1 . For tables with pk, all tables are flushed clearly, but unfortunately for tables with no pk on it, it give me this error:

[2020-04-14 10:00:00,096] INFO   Exporting data from table 'public.table_0' (io.debezium.relational.RelationalSnapshotChangeEventSource:280)
[2020-04-14 10:00:00,097] INFO   For table 'public.table_0' using select statement: 'SELECT * FROM "public"."table_0"' (io.debezium.relational.RelationalSnapshotChangeEventSource:287)
[2020-04-14 10:00:00,519] INFO   Finished exporting 296 records for table 'public.table_0'; total duration '00:00:00.421' (io.debezium.relational.RelationalSnapshotChangeEventSource:330)
[2020-04-14 10:00:00,522] INFO Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:79)
[2020-04-14 10:00:00,523] INFO Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfo=source_info[server='postgres'db='xxx, lsn=38/C74913C0, txId=4511542, timestamp=2020-04-14T02:00:00.517Z, snapshot=FALSE, schema=public, table=table_0], partition={server=postgres}, lastSnapshotRecord=true]] (io.debezium.pipeline.ChangeEventSourceCoordinator:90)
[2020-04-14 10:00:00,524] INFO Connected metrics set to 'true' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:59)
[2020-04-14 10:00:00,526] INFO Starting streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:100)
[2020-04-14 10:00:00,550] ERROR WorkerSourceTask{id=pg_dev_pinjammodal-0} failed to send record to table_0: (org.apache.kafka.connect.runtime.WorkerSourceTask:347)
org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence be rejected.

I have check the tables and it seem valid record. I set my producer producer.ack=1 in my config. Is this config trigger the invalidity in here?

like image 506
Yosafat Vincent Saragih Avatar asked Apr 14 '20 02:04

Yosafat Vincent Saragih


1 Answers

The problem was creating Kafka topics with log compaction for non-PK tables, which need keys. The messages don't have keys, because the tables don't have PKs. This results in the brokers not being able to validate the Kafka messages.

The solution is to not set log compaction to the topics and/or not pre-creating those topics. Another option would be to add PKs to the tables.

like image 75
Yosafat Vincent Saragih Avatar answered Oct 20 '22 20:10

Yosafat Vincent Saragih