Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you handle Amazon Kinesis Record duplicates?

According to the Amazon Kinesis Streams documentation, a record can be delivered multiple times.

The only way to be sure to process every record just once is to temporary store them in a database that supports Integrity checks (e.g. DynamoDB, Elasticache or MySQL/PostgreSQL) or just checkpoint the RecordId for each Kinesis shard.

Do you know a better / more efficient way of handling duplicates?

like image 428
Antonio Avatar asked Mar 27 '17 23:03

Antonio


People also ask

How do I delete data from Kinesis stream?

You can not delete previously inserted data from stream, but you can read data using KCL. KCL will create checkpoint every after one data slot read, so whenever you go for next slot of new data, KCL will read it from last checkpoint created in dynamodb table, so previously read data will be not included in next slot.

Is Kinesis exactly once?

Messaging semantics: Kinesis always uses “at least once” message delivery, whereas Kafka supports both “at least once” and “exactly once” message delivery. Message size: A single message in Kinesis can be up to 1MB. In Kafka, the max size is configurable.

How many put records per second does Amazon Kinesis Data Streams support?

It serves as a base throughput unit of a Kinesis data stream. A shard supports 1 MB/second and 1,000 records per second for writes and 2 MB/second for reads.

Does Kinesis data stream maintain order?

Amazon claims their Kinesis streaming product guarantees record ordering. It provides ordering of records, as well as the ability to read and/or replay records in the same order (...) Kinesis is composed of Streams that are themselves composed of one or more Shards. Records are stored in these Shards.


2 Answers

We had exactly that problem when building a telemetry system for a mobile app. In our case we were also unsure that producers where sending each message exactly once, therefore for each received record we calculated its MD5 on the fly and checked whether it is presented in some form of a persistent storage, but indeed what storage to use is the trickiest bit.

Firstly, we tried trivial relational database, but it quickly became a major bottleneck of the whole system as this isn't just read-heavy but also write-heavy case, since the volume of data going though Kinesis was quite significant.

We ended up having a DynamoDB table storing MD5's for each unique message. The issue we had was that it wasn't so easy to delete the messages - even though our table contained partition and sort keys, DynamoDB does not allow to drop all records with a given partition key, we had to query all of the to get sort key values (which wastes time and capacity). Unfortunately, we had to just simply drop the whole table once in a while. Another way suboptimal solution is to regularly rotate DynamoDB tables which store message identifiers.

However, recently DynamoDB introduced a very handy feature - Time To Live, which means that now we can control the size of a table by enabling auto-expiry on a per record basis. In that sense DynamoDB seems to be quite similar to ElastiCache, however ElastiCache (at least Memcached cluster) is much less durable - there is no redundancy there, and all data residing on terminated nodes is lost in case of scale in operation or failure.

like image 158
Dmitry Deryabin Avatar answered Sep 16 '22 12:09

Dmitry Deryabin


The thing you mentioned is a general problem of all queue systems with "at least once" approach. Also, not just the queue systems, the producers and consumers both may process the same message multiple times (due to ReadTimeout errors etc.). Kinesis and Kafka both uses that paradigm. Unfortunately there is not an easy answer for that.

You may also try to use an "exactly-once" message queue, with stricter transaction approach. For example AWS SQS does that: https://aws.amazon.com/about-aws/whats-new/2016/11/amazon-sqs-introduces-fifo-queues-with-exactly-once-processing-and-lower-prices-for-standard-queues/ . Be aware, SQS throughput is far smaller than Kinesis.

To solve your problem, you should be aware of your application domain and try to solve it internally like you suggested (database checks). Especially when you communicate with an external service (let's say an email server for example), you should be able to recover the operation state in order to prevent double processing (because double sending in the email server example, may result in multiple copies of the same post in the recipient's mailbox).

See also the following concepts;

  1. At-least-once Delivery: http://www.cloudcomputingpatterns.org/at_least_once_delivery/
  2. Exactly-once Delivery: http://www.cloudcomputingpatterns.org/exactly_once_delivery/
  3. Idempotent Processor: http://www.cloudcomputingpatterns.org/idempotent_processor/
like image 22
az3 Avatar answered Sep 17 '22 12:09

az3