Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Amazon KCL Checkpoints and Trim Horizon

How are checkpoints and trimming related in AWS KCL library?

The documentation page Handling Startup, Shutdown, and Throttling says:

By default, the KCL begins reading records from the tip of the stream;, which is the most recently added record. In this configuration, if a data-producing application adds records to the stream before any receiving record processors are running, the records are not read by the record processors after they start up.

To change the behavior of the record processors so that it always reads data from the beginning of the stream, set the following value in the properties file for your Amazon Kinesis Streams application:

initialPositionInStream = TRIM_HORIZON

The documentation page Developing an Amazon Kinesis Client Library Consumer in Java says:

Streams requires the record processor to keep track of the records that have already been processed in a shard. The KCL takes care of this tracking for you by passing a checkpointer (IRecordProcessorCheckpointer) to processRecords. The record processor calls the checkpoint method on this interface to inform the KCL of how far it has progressed in processing the records in the shard. In the event that the worker fails, the KCL uses this information to restart the processing of the shard at the last known processed record.

The first page seems to say that the KCL resumes at the tip of the stream, the second page at the last known processed record (that was marked as processed by the RecordProcessor using the checkpointer). In my case, I definitely need to restart at the last known processed record. Do I need to set the initialPositionInStream to TRIM_HORIZON?

like image 209
Edmondo1984 Avatar asked Jul 23 '16 16:07

Edmondo1984


3 Answers

With kinesis stream you have two options, you can read the newest records, or start from the oldest (TRIM_HORIZON).

But, once you started your application it just reads from the position it stopped using its checkpoints. You can see those checkpoints in dynamodb (Usually the table name is as the app name). So if you restart your app it will usually continue from where it stopped.

The answer is no, you don't need to set the initialPositionInStream to TRIM_HORIZON.

like image 146
itai ariel Avatar answered Oct 17 '22 17:10

itai ariel


When you are reading events from a kinesis stream, you have 4 options:

TRIM_HORIZON - the oldest events that are still in the stream shards before they are automatically trimmed (default 1 day, but can be extended up to 7 days). You will use this option if you want to start a new application that will process all the records that are available in the stream, but it will take a while until it is able to catch up and start processing the events in real-time.

LATEST - the newest events in the stream, and ignore all the past events. You will use this option if you start a new application that you want to process in teal time immediately.

AT/AFTER_SEQUENCE_NUMBER - the sequence number is usually the checkpoint that you are keeping while you are processing the events. These checkpoints are allowing you to reliably process the events, even in cases of reader failure or when you want to update its version and continue processing all the events and not lose any of them. The difference between AT/AFTER is based on the time of your checkpoint, before or after you processed the events successfully.

Please note that this is the only shard specific option, as all the other options are global to the stream. When you are using the KCL it is managing a DynamoDB table for that application with a record for each shard with the "current" sequence number for that shard.

AT_TIMESTAMP - the estimate time of the event put into the stream. You will use this option if you want to find specific events to process based on their timestamp. For example, when you know that you had a real life event in your service at a specific time, you can develop an application that will process these specific events, even if you don't have the sequence number.

See more details in Kinesis documentation here: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html

like image 28
Guy Avatar answered Oct 17 '22 17:10

Guy


You should use the "TRIM_HORIZON". It will only have effect on the first time your application starts to read records from the stream. After that, it will continue from last known position.

like image 3
Roee Gavirel Avatar answered Oct 17 '22 18:10

Roee Gavirel