Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I automatically append newlines to AWS Firehose records?

Tags:

I am trying to configure a Kinesis Analytics application with the following settings:

  • Input stream is a Kinesis Firehose which is taking stringified JSON values
  • The SQL is a simple passthrough (it needs to be more complicated later but for testing, it just sends the data through)
  • Output stream is a second Kinesis Firehose which delivers records to an S3 bucket

Later down the line, I will import the contents of the S3 bucket using Hive + JSONSERDE which expects each JSON record to live on its own line. The Firehose output just appends all of the JSON records which breaks JSONSERDE.

I could attach an AWS Lambda data formatter to the output stream but that seems expensive. All I want is to split each record using a newline.

If I was doing without an Analytics app I would append the newline to each Firehose record. It seems strange that there is no way to do that in the app's SQL:

CREATE OR REPLACE STREAM "STREAM_OUT" (
  a VARCHAR(4),
  b VARCHAR(4),
  c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  INSERT INTO "STREAM_OUT"
    SELECT STREAM
      "a",
      "b",
      "c"
    FROM "SOURCE_SQL_STREAM_001";

Is the best answer to add the Lambda data formatter? I'd really like to avoid this.

like image 372
MrHen Avatar asked May 29 '17 16:05

MrHen


People also ask

Can Lambda write to Kinesis firehose?

Amazon Kinesis Data Firehose captures, transforms, and loads streaming data into downstream services such as Kinesis Data Analytics or Amazon S3. You can write Lambda functions to request additional, customized processing of the data before it is sent downstream.

What is the difference between Kinesis data streams and Firehose?

Data Streams is a low latency streaming service in AWS Kinesis with the facility for ingesting at scale. On the other hand, Kinesis Firehose aims to serve as a data transfer service. The primary purpose of Kinesis Firehose focuses on loading streaming data to Amazon S3, Splunk, ElasticSearch, and RedShift.

What is the maximum size Kinesis data firehose record can have?

Q: What is a record in Kinesis Data Firehose? A record is the data of interest your data producer sends to a delivery stream. The maximum size of a record (before Base64-encoding) is 1024 KB.


2 Answers

I had a similar requirement to add new lines to the firehose generated files, In our application firehose is invoked via API Gateway.

This is specified in the Body Mapping Templates under Integration Request section.

The following command in the API Gateway generates new lines to the kinesis firehose records.

Method 1 :

    #set($payload="$input.path('$.Record.Data')
")
        {
            "DeliveryStreamName": "$input.path('$.DeliveryStreamName')",
            "Record": {
            "Data": "$util.base64Encode($payload)"
        }
        }

This works perfectly if you are invoking firehose via API Gateway.

Thanks & Regards, Srivignesh KN

like image 143
Srivignesh KN Avatar answered Oct 18 '22 10:10

Srivignesh KN


Solution using Python or Node.js

I am using DynamoDB Streams and I needed to get those records saved into S3. I implemented a Kinesis Firehose stream along with a Lambda function. This worked for getting my records into S3 as JSON strings, however, every record that was saved to the file in S3 was inline, that is, in one single continuous row and so I needed to add a new line at the end of each record that was added so that each record was on its own line. For my solution, I ended up having to do some base64 decoding/encoding.

Here is how I did it:

  1. When you create your Kinesis Firehose stream, enable "Transform
    source records with AWS Lambda" (select "Enabled"). If you have already created your stream, you can still enable this feature by editing your existing stream.
  2. At this point you will need to select another Lambda function that performs this transformation. In my case, I needed to add a new line at the end of each record so that when I open the file up in a text editor and view it, every entry is on a separate line.

Below is the tested solution code for both Python and Node.js that I used for that second Lambda:

Python solution to add a newline:

import json
import boto3
import base64

output = []

def lambda_handler(event, context):
    
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        print('payload:', payload)
        
        row_w_newline = payload + "\n"
        print('row_w_newline type:', type(row_w_newline))
        row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
        
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': row_w_newline
        }
        output.append(output_record)

    print('Processed {} records.'.format(len(event['records'])))
    
    return {'records': output}

Node.js solution to add a newline:

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {

   
    /* Process the list of records and transform them */
    const output = event.records.map((record) => {
        
        let entry = (new Buffer(record.data, 'base64')).toString('utf8');
        let result = entry + "\n"
        const payload = (new Buffer(result, 'utf8')).toString('base64');
            
            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
            
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};

Some good references that helped me piece the Python version together:

  • https://www.youtube.com/watch?v=wRGd2G82Opo&t=242s
  • https://www.youtube.com/watch?v=6_03i26_DrQ

In the original question up above, MrHen wanted to do this without using a second Lambda. I was able to get this working in the first Lambda, rather than using the Kinesis Firehose transform source records feature. I did this by taking the newImage from DynamoDB and doing, in this order: encode, decode, add new line ("\n"), encode, decode. There's probably a much cleaner way. I chose to go with the transform source records feature using the second Lambda function as it seems cleaner to me at this time.

In my case, the single Lambda solution looked like this:

 # Not pretty, but it works! Successfully adds new line to record.
 # newImage comes from the DynamoDB Stream as a Python dictionary object,
 # I convert it to a string before running the code below.

    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')
    newImage = newImage + "\n"
    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')
like image 32
Amiri Avatar answered Oct 18 '22 11:10

Amiri