Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Partition Kinesis firehose S3 records by event time

Firehose->S3 uses the current date as a prefix for creating keys in S3. So this partitions the data by the time the record is written. My firehose stream contains events which have a specific event time.

Is there a way to create S3 keys containing this event time instead? Processing tools downstream depend on each event being in an "hour-folder" related to when it actually happened. Or would that have to be an additional processing step after Firehose is done?

The event time could be in the partition key or I could use a Lambda function to parse it from the record.

like image 791
bibac Avatar asked Feb 09 '17 06:02

bibac


2 Answers

You'll need to do some post-processing or write a custom streaming consumer (such as Lambda) to do this.

We dealt with a huge event volume at my company, so writing a Lambda function didn't seem like a good use of money. Instead, we found batch-processing with Athena to be a really simple solution.

First, you stream into an Athena table, events, which can optionally be partitioned by an arrival-time.

Then, you define another Athena table, say, events_by_event_time which is partitioned by the event_time attribute on your event, or however it's been defined in the schema.

Finally, you schedule a process to run an Athena INSERT INTO query that takes events from events and automatically repartitions them to events_by_event_time and now your events are partitioned by event_time without requiring EMR, data pipelines, or any other infrastructure.

You can do this with any attribute on your events. It's also worth noting you can create a view that does a UNION of the two tables to query real-time and historic events.

I actually wrote more about this in a blog post here.

like image 130
J Kao Avatar answered Sep 17 '22 12:09

J Kao


Kinesis Firehose doesn't (yet) allow clients to control how the date suffix of the final S3 objects is generated.

The only option with you is to add a post-processing layer after Kinesis Firehose. For e.g., you could schedule an hourly EMR job, using Data Pipeline, that reads all files written in last hour and publishes them to correct S3 destinations.

like image 21
ketan vijayvargiya Avatar answered Sep 18 '22 12:09

ketan vijayvargiya