Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kinesis: What is the best/safe way to shutdown a worker?

I am using the AWS Kinesis Client Library.

I need a way to shutdown Kinesis Worker thread during deployments so, that I stop at a checkpoint and not in the middle of processRecords().

I see a shutdown boolean present in Worker.java but it is made private.

The reason I need is that checkpointing and idempotency is critical to me and I don't want to kill the process in the middle of a batch.

[EDIT]

Thanks to @CaptainMurphy, I noticed that Worker.java exposes shutdown() method which safely shuts down the worker and the LeaseCoordinator. What it doesn't do is call shutdown() task in the IRecordProcessor. It abruptly terminates the IRecordProcessor without worrying about the state.

I do understand that Idempotency between checkpoints is not guaranteed by the KCL and the developer should make the design fault tolerant but I feel that the IRecordProcessor should be properly shutdown before LeaseCoordinator stops irrespective of that.

like image 867
Kalyanaraman Santhanam Avatar asked Apr 25 '15 00:04

Kalyanaraman Santhanam


People also ask

How to stop Amazon Kinesis?

Delete Your Kinesis Data StreamsOpen the Kinesis Data Analytics console at https://console.aws.amazon.com/kinesisanalytics . In the Kinesis Data Streams panel, choose ExampleInputStream. In the ExampleInputStream page, choose Delete Kinesis Stream and then confirm the deletion.

What is read Throttling in Kinesis?

Each Kinesis shard has 1 MiB of data per second or 1000 records in write capacity and 5 read transactions in read capacity. Attempting to exceed these limits results in throttling and your producers and consumers will not be able to carry out their operations.

What are Kinesis streams?

Kinesis Data Streams. Capture,process,andstoredatastreams. Amazon Kinesis Data Streams is a scalable and durable real-time data streaming service that can continuously capture gigabytes of data per second from hundreds of thousands of sources.

Is Kinesis a global service?

Amazon Web Services (AWS) Kinesis is a cloud-based service that can fully manage large distributed data streams in real-time. This serverless data service captures, processes, and stores large amounts of data. It is a functional and secure global cloud platform with millions of customers from nearly every industry.


2 Answers

Since version 1.7.1 (see note below) application can request a graceful shutdown, and record processors that implement the IShutdownNotificationAware will be given a chance to checkpoint before being shutdown.

  • make sure record processor implements IShutdownNotificationAware interface in addition to one of IRecordProcessor interfaces. Call checkpoint in shutdownRequested(IRecordProcessorCheckpointer checkpointer) method. Pay attention - shutdown method of IRecordProcessor should call checkpoint only if shutdown reason is TERMINATE
  • on application shutdown initiate worker shutdown process

    Future<Void> shutdown = worker.requestShutdown();
    shutdown.get(); // wait for shutdown complete
    

PS: Kinesis Client before version 1.7.4 contains race condition which prevents correct shutdown. So use version 1.7.4 or later.

like image 134
Andrew Kolpakov Avatar answered Oct 22 '22 13:10

Andrew Kolpakov


The Record Processor shutdown method does indeed get called when you call shutdown on the Worker. You can trace it back from the ShutdownTask class, which is created by the ShardConsumer class, which is closed by the Worker.

So you can checkpoint at the last received record by listening for the shutdown call, passing the checkpointer the last sequence received function at the iteration point of your last processed value. E.g. in your overridden processRecords():

for(Record currRecord : records)
{
    someProcessSingleRecordMethod(currRecord)
    if(shutdown) 
    { 
        checkpointer.checkpoint(currRecord.getSequenceNumber()); 
        return; 
    } 
}

Where your shutdown method sets the shutdown flag to true.

Note that it's still best practice for Kinesis applications to be designed in an "at least once" fashion in the case of an ungraceful shutdown, such as instance termination. Receive and process "just once" may not be a good use case for Kinesis.

like image 42
Chris Riddell Avatar answered Oct 22 '22 14:10

Chris Riddell