Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to put data from server to Kinesis Stream

I am new to Kinesis. Reading out the documentation i found i can create the Kinesis Stream to get data from Producer. Then using KCL will read this data from Stream to further processing. I understand how to write the KCL application by implemeting IRecordProcessor .

However the very first stage as how to put data on Kinesis stream is still not clear to me. Do we have some AWS API which does need implementation to achieve this.

Scenarios: I have an server which is contineously getting data from various sources in the folders. Each folder is containing the text file whose rows are containing the required attributes for furhter analytical work. i have to push all these data to Kinesis Stream.

I need code something as below below class putData method wil be used to out in Kinesis stream

public class Put {

    AmazonKinesisClient kinesisClient;

    Put()
    {
        String accessKey = "My Access Key here" ;
        String secretKey = "My Secret Key here" ;
        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
       kinesisClient = new AmazonKinesisClient(credentials);
       kinesisClient.setEndpoint("kinesis.us-east-1.amazonaws.com", "kinesis", "us-east-1");
        System.out.println("starting the Put Application");
    }

    public void putData(String fileContent,String session) throws Exception
    {
         final String myStreamName = "ClickStream";

            PutRecordRequest putRecordRequest = new PutRecordRequest();
            putRecordRequest.setStreamName(myStreamName);
            String putData = fileContent;
            putRecordRequest.setData(ByteBuffer.wrap(putData.getBytes()));
            putRecordRequest.setPartitionKey("session"+session);
            PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
            System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey()
                    + ", ShardID : " + putRecordResult.getShardId());
            System.out.println(fileContent);
            System.out.println("Sequence Number: "+putRecordResult.getSequenceNumber());

            System.out.println("Data has been PUT successfully");


    }
}

However reading file from the source folder from the server and then what design i should use to call putData to get the record on Kinesis stream. Do i need infinite loop and reading all files and then do this or some framework which will better do this with care of fault tolerance , single point of failure all . Any help would be greatly appreciated.

Briefly: I need a better technique to put regularly generated data to Kinesis Stream the data is generated at regular interval to server. Thanks

like image 728
Sam Avatar asked Jun 21 '14 16:06

Sam


People also ask

How do you add data to your Amazon Kinesis stream?

You can add data to a Kinesis data stream through PutRecord and PutRecords operations, Amazon Kinesis Producer Library (KPL), or Amazon Kinesis Agent.


2 Answers

If you are tailing some files, try Fluentd. http://www.fluentd.org/

Amazon Kinesis has a pretty nice plugin for that. https://github.com/awslabs/aws-fluent-plugin-kinesis

like image 60
az3 Avatar answered Oct 16 '22 03:10

az3


So it seems you are already using... http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html

Specific method you want is as follows.

You need a stream name, record, and stream key. http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/PutRecordResult.html

But it seems you have all this?

You would then need a program running always tailing your server log file and when ever there is a new line it will push this.

But your data will only sit their for 24 hours. You then need a worker program to consume the data and place it in some other AWS resource.

like image 35
Dan Ciborowski - MSFT Avatar answered Oct 16 '22 02:10

Dan Ciborowski - MSFT