Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Amazon Kinesis & AWS Lambda Retries

I'm very new to Amazon Kinesis so maybe this is just a problem in my understanding but in the AWS Lambda FAQ it says:

The Amazon Kinesis and DynamoDB Streams records sent to your AWS Lambda function are strictly serialized, per shard. This means that if you put two records in the same shard, Lambda guarantees that your Lambda function will be successfully invoked with the first record before it is invoked with the second record. If the invocation for one record times out, is throttled, or encounters any other error, Lambda will retry until it succeeds (or the record reaches its 24-hour expiration) before moving on to the next record. The ordering of records across different shards is not guaranteed, and processing of each shard happens in parallel.

My question is, what happens if for some reason some malformed data gets put onto a shard by a producer and when the Lambda function picks it up it errors out and then just keeps retrying constantly? This then means that the processing of that particular shard would be blocked for 24 hours by the error.

Is the best practice to handle application errors like that by wrapping the problem in a custom error and sending this error downstream along with all the successfully processed records and let the consumer handle it? Of course, this still wouldn't help in the case of an unrecoverable error that crashed the program like a null pointer: again we'd be back to the blocking retry loop for the next 24 hours.

like image 624
Stefano Avatar asked Sep 10 '15 12:09

Stefano


People also ask

What does Amazon Kinesis do?

Amazon Kinesis enables you to ingest, buffer, and process streaming data in real-time, so you can derive insights in seconds or minutes instead of hours or days.

Is Amazon Kinesis same as Kafka?

Q: Is AWS Kinesis the same as Kafka? A: While both of these tools offer similar functionality and cover the same use cases, they are unrelated. Kinesis Data Streams is a proprietary product developed by AWS and is not based on open-source Apache Kafka.

Is Kinesis better than Kafka?

Performance-wise, Kafka has a clear advantage over Kinesis. Let's not forget that Kafka consistently gets better throughput than Kinesis. Kafka can reach a throughput of 30k messages per second, whereas the throughput of Kinesis is much lower, but still solidly in the thousands.

Is Kinesis a ETL tool?

Managing an ETL pipeline through Kinesis Data Analytics provides a cost-effective unified solution to real-time and batch database migrations using common technical knowledge skills like SQL querying.


2 Answers

Don't overthink it, the Kinesis is just a queue. You have to consume a record (ie. pop from the queue) successfully in order to proceed to the next one. Just like a FIFO stack.

The appropriate approach should be:

  • Get a record from stream.
  • Process it in a try-catch-finally block.
  • If the record is processed successfully, no problem. <- TRY
  • But if it fails, note it down to another place to investigate the reason why it failed. <- CATCH
  • And at the end of your logic blocks, always persist the position to DynamoDB. <- FINALLY
  • If an internal occurs in your system (memory error, hardware error etc) that is another story; as it may affect processing all of the records, not just one.

By the way, if processing of a record takes more than 1 minute, it is obvious you are doing something wrong. Because Kinesis is designed to handle thousands of records per second, you should not have the luxury of processing such long jobs for each of them.

The question you are asking is a general problem of queue systems, sometimes called "poisonous message". You have to handle them in your business logic to be safe.

http://www.cogin.com/articles/SurvivingPoisonMessages.php#PoisonMessages

like image 119
az3 Avatar answered Sep 21 '22 13:09

az3


This is a common question on processing events in Kinesis and I'll try to give you some points to build your Lambda function to handle such issues with "corrupted" data. Since it is best practice to have separated parts of your system writing to the Kinesis stream and other parts reading from the Kinesis stream, it is common that you will have such problems.

First, why do you have such problematic events?

Using Kinesis to process your events is a good way to break up a complex system that is doing both front-end processing (serving end users), and at the same time/code back-end processing (analyzing events), into two independent parts of your system. The front-end people can focus on their business, while the back-end people don't need to push code changes to the front-end, if they want to add functionality to serve their analytic use cases. Kinesis is a buffer of events that both breaks the need for synchronization as well simplifies the business logic code.

Therefore, we would like events written to the stream to be flexible in their "schema", and if the front-end teams wish to change the event format, add fields, delete fields, change the protocol or the encryption keys, they should be able to do that as often as they want.

Now it is up to the teams that are reading from the stream to be able to process such flexible events in an efficient way, and not break their processing every time such change is happening. Therefore, it should be common that your Lambda function will see events that it can't process, and "poison-pill" is not that rare event as you might expect.

Second, how do you handle such problematic events?

Your Lambda function will get a batch of events to process. Please note that you shouldn't get the events one by one, but in large batches of events. If your batches are too small, you will quickly get large lags on the stream.

For each batch you will iterate over the events, process them and then check-point in DynamoDB the last sequence-id of the batch. Lambda is doing most of these steps automatically with (see more here: http://docs.aws.amazon.com/lambda/latest/dg/walkthrough-kinesis-events-adminuser-create-test-function.html):

console.log('Loading function');  exports.handler = function(event, context) {     console.log(JSON.stringify(event, null, 2));     event.Records.forEach(function(record) {         // Kinesis data is base64 encoded so decode here         payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');         console.log('Decoded payload:', payload);     });     context.succeed(); }; 

This is what is happening in the "happy path", if all the events are processed without any problem. But if you encounter any problem in the batch and you don't "commit" the events with the success notification, the batch will fail and you will get all the events in the batch again.

Now you need to decide what is the reason of the failure in the processing.

  • Temporary problem (throttling, network issue...) - it is OK to wait a second and try again for a couple of times. In many cases the issue will resolve itself.

  • Occasional problem (out of memory...) - it is best to increase the memory allocation of the Lambda function or decrease the batch size. In many cases such modification will resolve the issue.

  • Constant failure - it means that you have to either ignore the problematic event (put it in a DLQ - dead-letter-queue) or modify your code to handle it.

The problem is to identify the type of failure in your code and handle it differently. You need to write your Lambda code in a way to identify it (type of exception, for example) and react differently.

You can use the integration with CloudWatch to write such failures to the console and create the relevant alarms. You can use the CloudWatch Logs also as a way to log your "dead-letter-queue" and see what is the source of problem.

like image 28
Guy Avatar answered Sep 19 '22 13:09

Guy