I am new to Kinesis. Reading out the documentation i found i can create the Kinesis Stream to get data from Producer. Then using KCL will read this data from Stream to further processing. I understand how to write the KCL application by implemeting IRecordProcessor .
However the very first stage as how to put data on Kinesis stream is still not clear to me. Do we have some AWS API which does need implementation to achieve this.
Scenarios: I have an server which is contineously getting data from various sources in the folders. Each folder is containing the text file whose rows are containing the required attributes for furhter analytical work. i have to push all these data to Kinesis Stream.
I need code something as below below class putData method wil be used to out in Kinesis stream
public class Put {
AmazonKinesisClient kinesisClient;
Put()
{
String accessKey = "My Access Key here" ;
String secretKey = "My Secret Key here" ;
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
kinesisClient = new AmazonKinesisClient(credentials);
kinesisClient.setEndpoint("kinesis.us-east-1.amazonaws.com", "kinesis", "us-east-1");
System.out.println("starting the Put Application");
}
public void putData(String fileContent,String session) throws Exception
{
final String myStreamName = "ClickStream";
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myStreamName);
String putData = fileContent;
putRecordRequest.setData(ByteBuffer.wrap(putData.getBytes()));
putRecordRequest.setPartitionKey("session"+session);
PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey()
+ ", ShardID : " + putRecordResult.getShardId());
System.out.println(fileContent);
System.out.println("Sequence Number: "+putRecordResult.getSequenceNumber());
System.out.println("Data has been PUT successfully");
}
}
However reading file from the source folder from the server and then what design i should use to call putData to get the record on Kinesis stream. Do i need infinite loop and reading all files and then do this or some framework which will better do this with care of fault tolerance , single point of failure all . Any help would be greatly appreciated.
Briefly: I need a better technique to put regularly generated data to Kinesis Stream the data is generated at regular interval to server. Thanks
You can add data to a Kinesis data stream through PutRecord and PutRecords operations, Amazon Kinesis Producer Library (KPL), or Amazon Kinesis Agent.
If you are tailing some files, try Fluentd. http://www.fluentd.org/
Amazon Kinesis has a pretty nice plugin for that. https://github.com/awslabs/aws-fluent-plugin-kinesis
So it seems you are already using... http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html
Specific method you want is as follows.
You need a stream name, record, and stream key. http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/PutRecordResult.html
But it seems you have all this?
You would then need a program running always tailing your server log file and when ever there is a new line it will push this.
But your data will only sit their for 24 hours. You then need a worker program to consume the data and place it in some other AWS resource.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With