I've read a lot of similar questions around adding newline characters to firehose, but they're all around adding the newline character to the source. The problem is that I don't have access to the source, and a third party is piping data to our Kinesis instance and I cannot add the '\n' to the source.
I've tried doing a firehose data transformation using the following code:
'use strict';
console.log('Loading function');
exports.handler = (event, context, callback) => {
/* Process the list of records and transform them */
const output = [];
event.records.forEach((record) => {
const results = {
/* This transformation is the "identity" transformation, the data is left intact */
recordId: record.recordId,
result: record.data.event_type === 'alert' ? 'Dropped' : 'Ok',
data: record.data + '\n'
};
output.push(results);
});
console.log(`Processing completed. Successful records ${output.length}.`);
callback(null, { records: output });
};
but the newline is still lost. I've also tried JSON.stringify(record.data) + '\n'
but then I get an Invalid output structure
error.
Dynamic partitioning enables you to continuously partition streaming data in Kinesis Data Firehose by using keys within data (for example, customer_id or transaction_id ) and then deliver the data grouped by these keys into corresponding Amazon Simple Storage Service (Amazon S3) prefixes.
Amazon Kinesis Data Firehose captures, transforms, and loads streaming data into downstream services such as Kinesis Data Analytics or Amazon S3. You can write Lambda functions to request additional, customized processing of the data before it is sent downstream.
Support for multiple data destinationsAmazon Kinesis Data Firehose currently supports Amazon S3, Amazon Redshift, Amazon OpenSearch Service, HTTP endpoints, Datadog, New Relic, MongoDB, and Splunk as destinations.
Kinesis Data Firehose delivers your data to your S3 bucket first and then issues an Amazon Redshift COPY command to load the data into your Amazon Redshift cluster. Specify an S3 bucket that you own where the streaming data should be delivered. Create a new S3 bucket, or choose an existing bucket that you own.
Try decoding the record.data add a new line then encode it again as base 64.
This is python but the idea is the same
for record in event['records']: payload = base64.b64decode(record['data']) # Do custom processing on the payload here payload = payload + '\n' output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(json.dumps(payload)) } output.append(output_record) return {'records': output}
From the comment of @Matt Westlake:
For those looking for the node answer, it's
const data = JSON.parse(new Buffer.from(record.data,'base64').toString('utf8'));
and
new Buffer.from(JSON.stringify(data) + '\n').toString('base64')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With