Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka s3 sink connector crashed when It gets NULL data

I had a working s3 sink connector until the source connector sent a NULL value; s3 connector crashed. The problem occured when I deleted a record from MS SQL db. The source connector shipped the deletion information to s3 connector and s3 connector crashed. I deleted and recreated s3 connector with a different name, nothing changed.

    org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
        at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
        at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
        at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2020-05-24 10:10:50,577 WARN WorkerSinkTask{id=minio-connector1-0} Ignoring invalid task provided offset filesql1.dbo.Files-0/OffsetAndMetadata{offset=16, leaderEpoch=null, metadata=''} -- not yet consumed, taskOffset=16 currentOffset=0 (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-minio-connector1-0]
2020-05-24 10:10:50,577 ERROR WorkerSinkTask{id=minio-connector1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
        at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
        at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
        at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
        ... 10 more
2020-05-24 10:10:50,577 ERROR WorkerSinkTask{id=minio-connector1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]

...and this is my s3 connector config:

    apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: "minio-connector1"
  labels:
    strimzi.io/cluster: mssql-minio-connect-cluster
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  config:
    storage.class: io.confluent.connect.s3.storage.S3Storage  
    partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
    tasks.max: '1'
    topics: filesql1.dbo.Files
    s3.bucket.name: dosyalar
    s3.part.size: '5242880'
    flush.size: '2'
    format: binary
    schema.compatibility: NONE
    max.request.size: "536870912"
    store.url: http://minio.dev-kik.io
    format.class: io.confluent.connect.s3.format.avro.AvroFormat
    key.converter: io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
    internal.key.converter: org.apache.kafka.connect.json.JsonConverter
    internal.value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator

I have 2 questions:

1) How can I make s3 connector runing again?

2) It cannot be expected not to delete records from source database. How can I prevent s3 connector's crash again?

like image 830
Tireli Efe Avatar asked May 24 '20 10:05

Tireli Efe


People also ask

How do I start the Kafka S3 connector in confluent?

To start the Kafka S3 connector, use the Confluent CLI to make a REST API call as follows: confluent local load meetups-to-s3 -- -d ./meetups-to-s3.json You can also use the curl command to perform the same operation as follows: confluent local load meetups-to-s3 -- -d ./meetups-to-s3.json

How do I load data from kafkatos3 to Amazon S3?

Using Confluent’s in-built Kafka S3 connector to load data from KafkatoS3 is one such way. Kafka allows users to transfer their data to a destination of their choice such as AmazonS3 by using one of the connectors provided by Confluent Hub.

What is Kafka and how does it work?

Written in Scala, Kafka supports bringing in data from a large variety of sources and stores them in the form of “topics” by processing the information stream. It uses two functions, namely Producers, which act as an interface between the data source and Kafka Topics, and Consumers, which allow users to read and transfer the data stored in Kafka.


1 Answers

please take a look at connector documentation and look for behavior.on.null.values. You can set it to ignore.

like image 145
Jiri Pechanec Avatar answered Oct 12 '22 00:10

Jiri Pechanec