Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Write parquet from AWS Kinesis firehose to AWS S3

I would like to ingest data into s3 from kinesis firehose formatted as parquet. So far I have just find a solution that implies creating an EMR, but I am looking for something cheaper and faster like store the received json as parquet directly from firehose or use a Lambda function.

Thank you very much, Javi.

like image 227
bracana Avatar asked Aug 01 '17 06:08

bracana


People also ask

Can Kinesis firehose convert to parquet?

You can now configure your Kinesis Data Firehose delivery stream to automatically convert data into Parquet or ORC format before delivering to your S3 bucket.

Can Kinesis data streams write to S3?

Kinesis Data Analytics for Apache Flink cannot write data to Amazon S3 with server-side encryption enabled on Kinesis Data Analytics. You can create the Kinesis stream and Amazon S3 bucket using the console.

Can Kinesis firehose transform data?

Kinesis Data Firehose can invoke your Lambda function to transform incoming source data and deliver the transformed data to destinations. You can enable Kinesis Data Firehose data transformation when you create your delivery stream.

Can Kinesis firehose read from S3?

Yes, Kinesis Data Firehose can back up all un-transformed records to your S3 bucket concurrently while delivering transformed records to destination. Source record backup can be enabled when you create or update your delivery stream.

How do I use Amazon Kinesis data Firehose with S3?

Amazon Kinesis Data Firehose can convert the format of your input data from JSON to Apache Parquet or Apache ORC before storing the data in Amazon S3. Parquet and ORC are columnar data formats that save space and enable faster queries To enable, go to your Firehose stream and click Edit.

How do I use kinesis Firehose with Apache Parquet?

Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2. As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported.

How do I convert data from Kinesis data firehose to ORC/Parquet?

A serializer to convert the data to the target columnar storage format (Parquet or ORC) – You can choose one of two types of serializers: ORC SerDe or Parquet SerDe . If you enable record format conversion, you can't set your Kinesis Data Firehose destination to be Amazon OpenSearch Service (OpenSearch Service), Amazon Redshift, or Splunk.

What is the difference between Amazon S3 and Amazon Kinesis?

For further information on Amazon S3, you can check the official website here. Amazon Kinesis is a fully-managed service provided by Amazon that allows users to process data from a diverse set of sources and stream it to various destinations in real-time. It houses the support for streaming data to the following storage destinations:


Video Answer


3 Answers

Good news, this feature was released today!

Amazon Kinesis Data Firehose can convert the format of your input data from JSON to Apache Parquet or Apache ORC before storing the data in Amazon S3. Parquet and ORC are columnar data formats that save space and enable faster queries

To enable, go to your Firehose stream and click Edit. You should see Record format conversion section as on screenshot below:

enter image description here

See the documentation for details: https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html

like image 169
Vlad Holubiev Avatar answered Oct 10 '22 09:10

Vlad Holubiev


After dealing with the AWS support service and a hundred of different implementations, I would like to explain what I have achieved.

Finally I have created a Lambda function that process every file generated by Kinesis Firehose, classifies my events according to the payload and stores the result in Parquet files in S3.

Doing that is not very easy:

  1. First of all you should create a Python virtual env, including all the required libraries (in my case Pandas, NumPy, Fastparquet, etc). As the resulted file (that includes all the libraries and my Lambda function is heavy, it is necessary to launch an EC2 instance, I have used the one included in the free tier). To create the virtual env follow these steps:

    • Login in EC2
    • Create a folder called lambda (or any other name)
    • Sudo yum -y update
    • Sudo yum -y upgrade
    • sudo yum -y groupinstall "Development Tools"
    • sudo yum -y install blas
    • sudo yum -y install lapack
    • sudo yum -y install atlas-sse3-devel
    • sudo yum install python27-devel python27-pip gcc
    • Virtualenv env
    • source env/bin/activate
    • pip install boto3
    • pip install fastparquet
    • pip install pandas
    • pip install thriftpy
    • pip install s3fs
    • pip install (any other required library)
    • find ~/lambda/env/lib*/python2.7/site-packages/ -name "*.so" | xargs strip
    • pushd env/lib/python2.7/site-packages/
    • zip -r -9 -q ~/lambda.zip *
    • Popd
    • pushd env/lib64/python2.7/site-packages/
    • zip -r -9 -q ~/lambda.zip *
    • Popd
  2. Create the lambda_function propertly:

    import json
    import boto3
    import datetime as dt
    import urllib
    import zlib
    import s3fs
    from fastparquet import write
    import pandas as pd
    import numpy as np
    import time
    
    def _send_to_s3_parquet(df):
        s3_fs = s3fs.S3FileSystem()
        s3_fs_open = s3_fs.open
        # FIXME add something else to the key or it will overwrite the file 
        key = 'mybeautifullfile.parquet.gzip'
        # Include partitions! key1 and key2
        write( 'ExampleS3Bucket'+ '/key1=value/key2=othervalue/' + key, df,
                compression='GZIP',open_with=s3_fs_open)            
    
    def lambda_handler(event, context):
        # Get the object from the event and show its content type
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'])
        try:
            s3 = boto3.client('s3')
            response = s3.get_object(Bucket=bucket, Key=key)
            data = response['Body'].read()
            decoded = data.decode('utf-8')
            lines = decoded.split('\n')
            # Do anything you like with the dataframe (Here what I do is to classify them 
            # and write to different folders in S3 according to the values of
            # the columns that I want
            df = pd.DataFrame(lines)
            _send_to_s3_parquet(df)
        except Exception as e:
            print('Error getting object {} from bucket {}.'.format(key, bucket))
            raise e
    
  3. Copy the lambda function to the lambda.zip and deploy the lambda_function:

    • Go back to your EC2 instance and add the lambda function desired to the zip: zip -9 lambda.zip lambda_function.py (lambda_function.py is the file generated in the step 2)
    • Copy the generated zip file to S3, as it is very heavy to be deployed withough doing it through S3. aws s3 cp lambda.zip s3://support-bucket/lambda_packages/
    • Deploy the lambda function: aws lambda update-function-code --function-name --s3-bucket support-bucket --s3-key lambda_packages/lambda.zip
  4. Trigger the to be executed when you like, e.g, each time a new file is created in S3, or even you could associate the lambda function to Firehose. (I did not choose this option because the 'lambda' limits are lower than the Firehose limits, you can configure Firehose to write a file each 128Mb or 15 minutes, but if you associate this lambda function to Firehose, the lambda function will be executed every 3 mins or 5MB, in my case I had the problem of generate a lot of little parquet files, as for each time that the lambda function is launched I generate at least 10 files).

like image 24
bracana Avatar answered Oct 10 '22 08:10

bracana


Amazon Kinesis Firehose receives streaming records and can store them in Amazon S3 (or Amazon Redshift or Amazon Elasticsearch Service).

Each record can be up to 1000KB.

Kinesis flow

However, records are appended together into a text file, with batching based upon time or size. Traditionally, records are JSON format.

You will be unable to send a parquet file because it will not conform to this file format.

It is possible to trigger a Lambda data transformation function, but this will not be capable of outputting a parquet file either.

In fact, given the nature of parquet files, it is unlikely that you could build them one record at a time. Being a columnar storage format, I suspect that they really need to be created in a batch rather than having data appended per-record.

Bottom line: Nope.

like image 11
John Rotenstein Avatar answered Oct 10 '22 07:10

John Rotenstein