Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to store Kinesis stream to S3 storage in specific folder structure within S3 bucket

I have event captured by Kinesis Stream.I want to put all events on specific folder structure on S3. I want to make a folder with date stamp like all events of 15th June should go in that folder and 16th june onwards the new folder should come to pick events and so on.

Being new to Kinesis i am just going with the documentation and i found there is connector framework where S3Emitter is used with configuration to pick the S3 location where data needs to be emitted.However can somebody please suggest me as how to maintain a folder structure to capture event date in a date wise folder ?

like image 438
Sam Avatar asked Jun 15 '14 16:06

Sam


3 Answers

I found a way to solve this issue and have posted the answer here: https://github.com/awslabs/amazon-kinesis-connectors/issues/24

Here is the answer again:

It is easy to achieve with the following changes to the sample code:

In S3sample.properties:

createS3Bucket = true

In S3Emitter.java:

/* Add the required imports */

import java.text.SimpleDateFormat;
import java.util.Calendar;

public class S3Emitter implements IEmitter {

    //create date_bucket variable

    protected final String date_bucket = new SimpleDateFormat("yyyy_MM_dd_HH").format(Calendar.getInstance().getTime());

    public S3Emitter(KinesisConnectorConfiguration configuration) {
        s3Bucket = configuration.S3_BUCKET + "/" + date_bucket;
    }
}

Hope this helps!

like image 139
poovizhi Avatar answered Oct 19 '22 22:10

poovizhi


The functionality you are looking for is unfortunately not available in the S3Emitter for Amazon Kinesis at this point, rather it simply works as a buffer which is flushed based on the amount of input data, see the resp. comment:

This implementation of IEmitter is used to store files from a Kinesis stream in S3. [...] When the buffer is full, this class's emit method adds the contents of the buffer to S3 as one file. The filename is generated from the first and last sequence numbers of the records contained in that file separated by a dash. [...] [emphasis mine]

Also, Kinesis doesn't have a first level date concept for events (resp. Data Records), rather only deals with sequence numbers, so you'd need to add the resp. date handling at the application level, see section Data Record within Amazon Kinesis Terminology:

Data records are the units of data that are stored in an Amazon Kinesis stream. Data records are composed of a sequence number, a partition key, and a data blob, which is an un-interpreted, immutable sequence of bytes. The Amazon Kinesis service does not inspect, interpret, or change the data in the blob in any way. [...] [emphasis mine]

like image 31
Steffen Opel Avatar answered Oct 19 '22 22:10

Steffen Opel


Since 2014, AWS offers new solution. And especially Kinesis Firehose, that does this very job. You just have to send data from Kinesis stream to Kinesis Firehose with this lambda and to create in few clicks the firehose.

like image 30
DaMaill Avatar answered Oct 20 '22 00:10

DaMaill