I read the document of official AWS Kinesis Firehose but it doesn't mention how to handle duplicated events. Does anyone have experience on it? I googled someone use ElasticCache to do filtering, does it mean I need to use AWS Lambda to encapsulate such filtering logic? Is there any simple way like firehose to ingest data into Redshift and at the same time has "exactly once" semantics? Thanks a lot!
Kinesis Data Firehose delivers your data to your S3 bucket first and then issues an Amazon Redshift COPY command to load the data into your Amazon Redshift cluster. Specify an S3 bucket that you own where the streaming data should be delivered.
Firehose automatically delivers the data to the Amazon S3 bucket or Amazon Redshift table that you specify in the delivery stream. For information about supported versions, see Supported Systems and Versions. To write data to Amazon Kinesis Streams, use the Kinesis Producer destination.
Data Streams is a low latency streaming service in AWS Kinesis with the facility for ingesting at scale. On the other hand, Kinesis Firehose aims to serve as a data transfer service. The primary purpose of Kinesis Firehose focuses on loading streaming data to Amazon S3, Splunk, ElasticSearch, and RedShift.
In addition, Kinesis Data Streams synchronously replicates data across three Availability Zones, providing high availability and data durability.
You can have duplication on both sides of the Kinesis Stream. You might put the same events twice into the Stream, and you might read the event twice by the consumers.
The producers side can happen if you try to put an event to the Kinesis stream, but for some reason you are not sure if it was written successfully or not, and you decide to put it again. The consumer side can happen if you are getting a batch of events and start processing them, and you crash before you managed to checkpoint your location, and the next worker is picking the same batch of events from the Kinesis stream, based on the last checkpoint sequence-ID.
Before you start solving this problem, you should evaluate how often do you have such duplication and what is the business impact of such duplications. Not every system is handling financial transactions that can't tolerate duplication. Nevertheless, if you decide that you need to have such de-duplication, a common way to solve it is to use some event-ID and track if you processed that event-ID already.
ElasticCache with Redis is a good place to track your event-ID. Every time you pick up an event for processing, you check if you already have it in the hash table in Redis, if you find it, you skip it, and if you don't find it, you add it to the table (with some TTL based on the possible time window for such duplication).
If you choose to use Kinesis Firehose (instead of Kinesis Streams), you no longer have control on the consumer application and you can't implement this process. Therefore, you either want to run such de-duplication logic on the producer side, switch to use Kinesis Streams and run your own code in Lambda or KCL, or settle for the de-duplication functions in Redshift (see below).
If you are not too sensitive to duplication, you can use some functions in Redshift, such as COUNT DISTINCT or LAST_VALUE in a WINDOW function.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With