I had a working s3 sink connector until the source connector sent a NULL value; s3 connector crashed. The problem occured when I deleted a record from MS SQL db. The source connector shipped the deletion information to s3 connector and s3 connector crashed. I deleted and recreated s3 connector with a different name, nothing changed.
org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-05-24 10:10:50,577 WARN WorkerSinkTask{id=minio-connector1-0} Ignoring invalid task provided offset filesql1.dbo.Files-0/OffsetAndMetadata{offset=16, leaderEpoch=null, metadata=''} -- not yet consumed, taskOffset=16 currentOffset=0 (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-minio-connector1-0]
2020-05-24 10:10:50,577 ERROR WorkerSinkTask{id=minio-connector1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
... 10 more
2020-05-24 10:10:50,577 ERROR WorkerSinkTask{id=minio-connector1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]
...and this is my s3 connector config:
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: "minio-connector1"
labels:
strimzi.io/cluster: mssql-minio-connect-cluster
spec:
class: io.confluent.connect.s3.S3SinkConnector
config:
storage.class: io.confluent.connect.s3.storage.S3Storage
partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
tasks.max: '1'
topics: filesql1.dbo.Files
s3.bucket.name: dosyalar
s3.part.size: '5242880'
flush.size: '2'
format: binary
schema.compatibility: NONE
max.request.size: "536870912"
store.url: http://minio.dev-kik.io
format.class: io.confluent.connect.s3.format.avro.AvroFormat
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
internal.key.converter: org.apache.kafka.connect.json.JsonConverter
internal.value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
I have 2 questions:
1) How can I make s3 connector runing again?
2) It cannot be expected not to delete records from source database. How can I prevent s3 connector's crash again?
To start the Kafka S3 connector, use the Confluent CLI to make a REST API call as follows: confluent local load meetups-to-s3 -- -d ./meetups-to-s3.json You can also use the curl command to perform the same operation as follows: confluent local load meetups-to-s3 -- -d ./meetups-to-s3.json
Using Confluent’s in-built Kafka S3 connector to load data from KafkatoS3 is one such way. Kafka allows users to transfer their data to a destination of their choice such as AmazonS3 by using one of the connectors provided by Confluent Hub.
Written in Scala, Kafka supports bringing in data from a large variety of sources and stores them in the form of “topics” by processing the information stream. It uses two functions, namely Producers, which act as an interface between the data source and Kafka Topics, and Consumers, which allow users to read and transfer the data stored in Kafka.
please take a look at connector documentation and look for behavior.on.null.values
. You can set it to ignore
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With