Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to connect Apache Kafka with Amazon S3?

I want to store data from Kafka into a bucket s3 using Kafka Connect. I had already a Kafka's topic running and I had a bucket s3 created. My topic has data on Protobuffer, I tried with https://github.com/qubole/streamx and I obtained the next error:

 [2018-10-04 13:35:46,512] INFO Revoking previously assigned partitions [] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
 [2018-10-04 13:35:46,512] INFO (Re-)joining group connect-s3-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
 [2018-10-04 13:35:46,645] INFO Successfully joined group connect-s3-sink with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
 [2018-10-04 13:35:46,692] INFO Setting newly assigned partitions [ssp.impressions-11, ssp.impressions-10, ssp.impressions-7, ssp.impressions-6, ssp.impressions-9, ssp.impressions-8, ssp.impressions-3, ssp.impressions-2, ssp.impressions-5, ssp.impressions-4, ssp.impressions-1, ssp.impressions-0] for Group connect-s3-sink(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
 [2018-10-04 13:35:47,193] ERROR Task s3-sink-0 threw an uncaught an unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
 java.lang.NullPointerException
    at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:290)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2018-10-04 13:35:47,194] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2018-10-04 13:35:51,235] INFO Reflections took 6844 ms to scan 259 urls, producing 13517 keys and 95788 values (org.reflections.Reflections:229)

I did the next steps:

  1. I cloned the repository.
  2. mvn DskipTests package
  3. nano config/connect-standalone.properties

    bootstrap.servers=ip-myip.ec2.internal:9092
    key.converter=com.qubole.streamx.ByteArrayConverter
    value.converter=com.qubole.streamx.ByteArrayConverter
    
  4. nano config/quickstart-s3.properties

    name=s3-sink 
    connector.class=com.qubole.streamx.s3.S3SinkConnector
    format.class=com.qubole.streamx.SourceFormat tasks.max=1
    topics=ssp.impressions
    flush.size=3
    s3.url=s3://myaccess_key:mysecret_key@mybucket/demo
    
  5. connect-standalone /etc/kafka/connect-standalone.properties quickstart-s3.properties

I would like to know if that I did is okay or another way to keep data into S3 from Kafka.

like image 682
Eric Bellet Avatar asked Oct 03 '18 10:10

Eric Bellet


People also ask

Can Kafka read from S3?

simlar Hadoop related tools can read from S3 and write events to Kafka as well. The problems with this approach is that you need to keep track of what files have been read so far, as well as handle partially read files.

What is Kafka S3 connector?

The S3 connector, currently available as a sink, allows you to export data from Kafka topics to S3 objects in either Avro or JSON formats. In addition, for certain data layouts, S3 connector exports data by guaranteeing exactly-once delivery semantics to consumers of the S3 objects it produces.


1 Answers

You can use Kafka Connect to do this integration, with the Kafka Connect S3 connector.

Kafka Connect is part of Apache Kafka, and the S3 connector is an open-source connector available either standalone or as part of Confluent Platform.

For general information and examples of Kafka Connect, this series of articles might help:

  • https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/
  • https://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/
  • https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/

Disclaimer: I work for Confluent, and wrote the above blog articles.


April 2020: I have recorded a video showing how to use the S3 sink: https://rmoff.dev/kafka-s3-video

like image 172
Robin Moffatt Avatar answered Oct 16 '22 11:10

Robin Moffatt