I'm currently sending a series of xml messages to aws kinesis stream, I've been using this on different projects, so I'm pretty confident that this bit works. Then I've written a lambda to process events from kinesis stream to kinesis firehose:
import os
import boto3
import base64
firehose = boto3.client('firehose')
def lambda_handler(event, context):
deliveryStreamName = os.environ['FIREHOSE_STREAM_NAME']
# Send record directly to firehose
for record in event['Records']:
data = record['kinesis']['data']
response = firehose.put_record(
DeliveryStreamName=deliveryStreamName,
Record={'Data': data}
)
print(response)
I've set the kinesis stream as the lamdba trigger, and set the batch size as 1, and starting position LATEST.
For the kinesis firehose I have the following config:
Data transformation*: Disabled
Source record backup*: Disabled
S3 buffer size (MB)*: 10
S3 buffer interval (sec)*: 60
S3 Compression: UNCOMPRESSED
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled
I sent 162 events, and I read them from s3, and the most I've managed to get it 160, and usually it's less. I've even tried to wait a few hours incase something strange was happening with retries.
Anyone had any experience using kinesis-> lamdba -> firehose, and seen issues of lost data?
Data Streams is a low latency streaming service in AWS Kinesis with the facility for ingesting at scale. On the other hand, Kinesis Firehose aims to serve as a data transfer service. The primary purpose of Kinesis Firehose focuses on loading streaming data to Amazon S3, Splunk, ElasticSearch, and RedShift.
A Kinesis data stream stores records from 24 hours by default, up to 8760 hours (365 days). You can update the retention period via the Kinesis Data Streams console or by using the IncreaseStreamRetentionPeriod and the DecreaseStreamRetentionPeriod operations.
If your data source is Direct put and the data delivery to your Amazon S3 bucket fails, then Amazon Kinesis Data Firehose will retry to deliver data every 5 seconds for up to a maximum period of 24 hours.
From what I see here, most likely items are lost when you are publishing data to the Kinesis Stream (not FireHose).
Since you are using put_record
when writing to FireHose, it will throw an exception and the lambda will be retried in that case. (Make sense to check if there are failures on that level).
So considering that I may suppose that records are lost before they reach Kinesis stream.
If you are sending items to Kinesis stream using put_records
method, that doesn't guarantee that all the records will be sent to the stream(due to exceeded write throughput or internal errors), some of the records may fail to be sent. In that case failed subset of records should be resend by your code (Here is Java example, sorry I wasn't able to find the Python one).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With