Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting a data stream from a zipped file sitting in a S3 bucket using boto3 lib and AWS Lambda

I am trying to create a serverless processor for my chron job, In this job I receive a zipped file in my S3 bucket from one of my clients, file is around 50MB in size but once you unzip it, it becomes 1.5GB in size and there's a hard limit to the space available on AWS Lambda which is 500MB due to which I cannot download this file from S3 bucket and unzip it on my Lambda, I was successfully able to unzip my file and stream the content line by line from S3 using funzip in unix script.

for x in $files ; do echo -n "$x: " ; timeout 5 aws s3 cp $monkeydir/$x - | funzip

My Bucket Name:MonkeyBusiness Key:/Daily/Business/Banana/{current-date} Object:banana.zip

but now since I am trying to achieve same output using boto3, how I can stream the zipped content to sys i/o and unzip the stream save the content in separate files divided by 10000 lines each and upload the chunked files back to S3. Need guidance as I am pretty new to AWS and boto3.

Please let me know if you need more details about the job.

Below given suggested solution is not applicable here because zlib documentation clearly states that said lib is compatible for gzip file format and my question is for zip file format.

import zlib

def stream_gzip_decompress(stream):
    dec = zlib.decompressobj(32 + zlib.MAX_WBITS)  # offset 32 to skip the header
    for chunk in stream:
        rv = dec.decompress(chunk)
        if rv:
            yield rv 
like image 865
Shek Avatar asked Sep 05 '17 15:09

Shek


2 Answers

So I used BytesIO to read the compressed file into a buffer object, then I used zipfile to open the decompressed stream as uncompressed data and I was able to get the datum line by line.

import io
import zipfile
import boto3
import sys

s3 = boto3.resource('s3', 'us-east-1')


def stream_zip_file():
    count = 0
    obj = s3.Object(
        bucket_name='MonkeyBusiness',
        key='/Daily/Business/Banana/{current-date}/banana.zip'
    )
    buffer = io.BytesIO(obj.get()["Body"].read())
    print (buffer)
    z = zipfile.ZipFile(buffer)
    foo2 = z.open(z.infolist()[0])
    print(sys.getsizeof(foo2))
    line_counter = 0
    for _ in foo2:
        line_counter += 1
    print (line_counter)
    z.close()


if __name__ == '__main__':
    stream_zip_file()
like image 68
Shek Avatar answered Oct 08 '22 18:10

Shek


This is not the exact answer. But you can try this out.

First, please adapt the answer that mentioned about gzip file with limited memory, this method allow one to stream file chunk by chunk. And boto3 S3 put_object() and upload_fileobj seems allow streaming.

You need to mix and adapt the above mentioned code with following decompression.

stream = cStringIO.StringIO()
stream.write(s3_data)
stream.seek(0)
blocksize = 1 << 16  #64kb
with gzip.GzipFile(fileobj=stream) as decompressor:
    while True:
        boto3.client.upload_fileobj(decompressor.read(blocksize), "bucket", "key")

I cannot guarantee the above code will works, it is just give you the idea to decompress file and re-uplaod it by chunks. You might even need to pipeline the decompress data to ByteIo and pipeline to upload_fileobj. There is a lot of testing.

if you don't need to decompress the file ASAP, my suggestion is use lambda to put the file into SQS queue. When there is "enough" file, trigger a SPOT instance (that will be pretty cheap) that will read the queue and process the file.

like image 29
mootmoot Avatar answered Oct 08 '22 17:10

mootmoot