Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Streaming in / chunking csv's from S3 to Python

I intend to perform some memory intensive operations on a very large csv file stored in S3 using Python with the intention of moving the script to AWS Lambda. I know I can read in the whole csv nto memory, but I will definitely run into Lambda's memory and storage limits with such a large filem is there any way to stream in or just read in chunks of a csv at a time into Python using boto3/botocore, ideally by spefifying row numbers to read in?

Here are some things I've already tried:

1) using the range parameter in S3.get_object to specify the range of bytes to read in. Unfortunately this means the last rows get cut off in the middle since there's no ways to specify the number of rows to read in. There are some messy workarounds like scanning for the last newline character, recording the index, and then using that as the starting point for the next bytes range, but I'd like to avoid this clunky solution if possible.

2) Using S3 select to write sql queries to selectively retrieve data from S3 buckets. Unfortunately the row_numbers SQL function isn't supported and it doesn't look like there's a way to read in a a subset of rows.

like image 743
Ajjit Narayanan Avatar asked Jun 28 '18 14:06

Ajjit Narayanan


People also ask

Can I stream a file from S3?

Full load allows to you stream existing data from an S3 bucket to Kinesis. You can use full load to migrate previously stored data before streaming CDC data. The full load data should already exist before the task starts. For new CDC files, the data is streamed to Kinesis on a file delivery event in real-time.


2 Answers

Assuming your file isn't compressed, this should involve reading from a stream and splitting on the newline character. Read a chunk of data, find the last instance of the newline character in that chunk, split and process.

s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key)['Body']

# number of bytes to read per chunk
chunk_size = 1000000

# the character that we'll split the data with (bytes, not string)
newline = '\n'.encode()   
partial_chunk = b''

while (True):
    chunk = partial_chunk + body.read(chunk_size)

    # If nothing was read there is nothing to process
    if chunk == b'':
        break

    last_newline = chunk.rfind(newline)

    # write to a smaller file, or work against some piece of data
    result = chunk[0:last_newline+1].decode('utf-8')

    # keep the partial line you've read here
    partial_chunk = chunk[last_newline+1:]

If you have gzipped files, then you need to use BytesIO and the GzipFile class inside the loop; it's a harder problem because you need to retain the Gzip compression details.

like image 156
Kirk Broadhurst Avatar answered Oct 28 '22 12:10

Kirk Broadhurst


I have developed a code similar to @Kirk Broadhurst's, but connection timeout was happening if the processing time for each chunk exceeds 5 minutes(roughly). The following code works by opening a new connection for each chunk.

import boto3
import pandas as pd
import numpy as np

# The following credentials should not be hard coded, it's best to get these from cli.
region_name = 'region'
aws_access_key_id = 'aws_access_key_id'
aws_secret_access_key = 'aws_secret_access_key'

s3 =boto3.client('s3',region_name=region_name,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

obj = s3.get_object(Bucket='bucket', Key='key')

total_bytes = obj['ContentLength']
chunk_bytes = 1024*1024*5 # 5 MB as an example.
floor = int(total_bytes//chunk_bytes)
whole = total_bytes/chunk_bytes
total_chunks = [1+floor if floor<whole else floor][0]

chunk_size_list = [(i*chunk_bytes, (i+1)*chunk_bytes-1) for i in range(total_chunks)]
a,b = chunk_size_list[-1]
b = total_bytes
chunk_size_list[-1] = (a,b)
chunk_size_list = [f'bytes={a}-{b}' for a,b in chunk_size_list]

prev_str = ''

for i,chunk in enumerate(chunk_size_list):
    s3 = boto3.client('s3', region_name=region_name, aws_access_key_id=aws_access_key_id, 
                      aws_secret_access_key=aws_secret_access_key)
    byte_obj = s3.get_object(Bucket='bucket', Key='key', Range=chunk_size_list[i])
    byte_obj = byte_obj['Body'].read()
    str_obj = byte_obj.decode('utf-8')
    del byte_obj
    list_obj = str_obj.split('\n')
    # You can use another delimiter instead of ',' below.
    if len(prev_str.split(',')) < len(list_obj[1].split(',')) or len(list_obj[0].split(',')) < len(list_obj[1].split(',')):
        list_obj[0] = prev_str+list_obj[0]
    else:
        list_obj = [prev_str]+list_obj
    prev_str = list_obj[-1]
    del str_obj, list_obj[-1] 
    list_of_elements = [st.split(',') for st in list_obj]
    del list_obj
    df = pd.DataFrame(list_of_elements)
    del list_of_elements
    gc.collect()
    # You can process your pandas dataframe here, but you need to cast it to correct datatypes.
    # casting na values to numpy nan type.
    na_values = ['', '#N/A', '#N/A N/A', '#NA', '-1.#IND', '-1.#QNAN', '-NaN', '-nan', '1.#IND', '1.#QNAN', 'N/A', 'NA', 'NULL', 'NaN', 'n/a', 'nan', 'null']
    df = df.replace(na_values, np.nan)
    dtypes = {col1: 'float32', col2:'category'}
    df = df.astype(dtype=dtypes, copy=False)
like image 31
Ramki Avatar answered Oct 28 '22 10:10

Ramki