I want to store data from Kafka into a bucket s3 using Kafka Connect. I had already a Kafka's topic running and I had a bucket s3 created. My topic has data on Protobuffer, I tried with https://github.com/qubole/streamx and I obtained the next error:
[2018-10-04 13:35:46,512] INFO Revoking previously assigned partitions [] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
[2018-10-04 13:35:46,512] INFO (Re-)joining group connect-s3-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
[2018-10-04 13:35:46,645] INFO Successfully joined group connect-s3-sink with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
[2018-10-04 13:35:46,692] INFO Setting newly assigned partitions [ssp.impressions-11, ssp.impressions-10, ssp.impressions-7, ssp.impressions-6, ssp.impressions-9, ssp.impressions-8, ssp.impressions-3, ssp.impressions-2, ssp.impressions-5, ssp.impressions-4, ssp.impressions-1, ssp.impressions-0] for Group connect-s3-sink(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
[2018-10-04 13:35:47,193] ERROR Task s3-sink-0 threw an uncaught an unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.NullPointerException
at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:290)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-10-04 13:35:47,194] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2018-10-04 13:35:51,235] INFO Reflections took 6844 ms to scan 259 urls, producing 13517 keys and 95788 values (org.reflections.Reflections:229)
I did the next steps:
mvn DskipTests package
nano config/connect-standalone.properties
bootstrap.servers=ip-myip.ec2.internal:9092
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
nano config/quickstart-s3.properties
name=s3-sink
connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=com.qubole.streamx.SourceFormat tasks.max=1
topics=ssp.impressions
flush.size=3
s3.url=s3://myaccess_key:mysecret_key@mybucket/demo
connect-standalone /etc/kafka/connect-standalone.properties quickstart-s3.properties
I would like to know if that I did is okay or another way to keep data into S3 from Kafka.
simlar Hadoop related tools can read from S3 and write events to Kafka as well. The problems with this approach is that you need to keep track of what files have been read so far, as well as handle partially read files.
The S3 connector, currently available as a sink, allows you to export data from Kafka topics to S3 objects in either Avro or JSON formats. In addition, for certain data layouts, S3 connector exports data by guaranteeing exactly-once delivery semantics to consumers of the S3 objects it produces.
You can use Kafka Connect to do this integration, with the Kafka Connect S3 connector.
Kafka Connect is part of Apache Kafka, and the S3 connector is an open-source connector available either standalone or as part of Confluent Platform.
For general information and examples of Kafka Connect, this series of articles might help:
Disclaimer: I work for Confluent, and wrote the above blog articles.
April 2020: I have recorded a video showing how to use the S3 sink: https://rmoff.dev/kafka-s3-video
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With