I intend to perform some memory intensive operations on a very large csv file stored in S3 using Python with the intention of moving the script to AWS Lambda. I know I can read in the whole csv nto memory, but I will definitely run into Lambda's memory and storage limits with such a large filem is there any way to stream in or just read in chunks of a csv at a time into Python using boto3/botocore, ideally by spefifying row numbers to read in?
Here are some things I've already tried:
1) using the range
parameter in S3.get_object
to specify the range of bytes to read in. Unfortunately this means the last rows get cut off in the middle since there's no ways to specify the number of rows to read in. There are some messy workarounds like scanning for the last newline character, recording the index, and then using that as the starting point for the next bytes range, but I'd like to avoid this clunky solution if possible.
2) Using S3 select to write sql queries to selectively retrieve data from S3 buckets. Unfortunately the row_numbers
SQL function isn't supported and it doesn't look like there's a way to read in a a subset of rows.
Full load allows to you stream existing data from an S3 bucket to Kinesis. You can use full load to migrate previously stored data before streaming CDC data. The full load data should already exist before the task starts. For new CDC files, the data is streamed to Kinesis on a file delivery event in real-time.
Assuming your file isn't compressed, this should involve reading from a stream and splitting on the newline character. Read a chunk of data, find the last instance of the newline character in that chunk, split and process.
s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key)['Body']
# number of bytes to read per chunk
chunk_size = 1000000
# the character that we'll split the data with (bytes, not string)
newline = '\n'.encode()
partial_chunk = b''
while (True):
chunk = partial_chunk + body.read(chunk_size)
# If nothing was read there is nothing to process
if chunk == b'':
break
last_newline = chunk.rfind(newline)
# write to a smaller file, or work against some piece of data
result = chunk[0:last_newline+1].decode('utf-8')
# keep the partial line you've read here
partial_chunk = chunk[last_newline+1:]
If you have gzipped files, then you need to use BytesIO
and the GzipFile
class inside the loop; it's a harder problem because you need to retain the Gzip compression details.
I have developed a code similar to @Kirk Broadhurst's, but connection timeout was happening if the processing time for each chunk exceeds 5 minutes(roughly). The following code works by opening a new connection for each chunk.
import boto3
import pandas as pd
import numpy as np
# The following credentials should not be hard coded, it's best to get these from cli.
region_name = 'region'
aws_access_key_id = 'aws_access_key_id'
aws_secret_access_key = 'aws_secret_access_key'
s3 =boto3.client('s3',region_name=region_name,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
obj = s3.get_object(Bucket='bucket', Key='key')
total_bytes = obj['ContentLength']
chunk_bytes = 1024*1024*5 # 5 MB as an example.
floor = int(total_bytes//chunk_bytes)
whole = total_bytes/chunk_bytes
total_chunks = [1+floor if floor<whole else floor][0]
chunk_size_list = [(i*chunk_bytes, (i+1)*chunk_bytes-1) for i in range(total_chunks)]
a,b = chunk_size_list[-1]
b = total_bytes
chunk_size_list[-1] = (a,b)
chunk_size_list = [f'bytes={a}-{b}' for a,b in chunk_size_list]
prev_str = ''
for i,chunk in enumerate(chunk_size_list):
s3 = boto3.client('s3', region_name=region_name, aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
byte_obj = s3.get_object(Bucket='bucket', Key='key', Range=chunk_size_list[i])
byte_obj = byte_obj['Body'].read()
str_obj = byte_obj.decode('utf-8')
del byte_obj
list_obj = str_obj.split('\n')
# You can use another delimiter instead of ',' below.
if len(prev_str.split(',')) < len(list_obj[1].split(',')) or len(list_obj[0].split(',')) < len(list_obj[1].split(',')):
list_obj[0] = prev_str+list_obj[0]
else:
list_obj = [prev_str]+list_obj
prev_str = list_obj[-1]
del str_obj, list_obj[-1]
list_of_elements = [st.split(',') for st in list_obj]
del list_obj
df = pd.DataFrame(list_of_elements)
del list_of_elements
gc.collect()
# You can process your pandas dataframe here, but you need to cast it to correct datatypes.
# casting na values to numpy nan type.
na_values = ['', '#N/A', '#N/A N/A', '#NA', '-1.#IND', '-1.#QNAN', '-NaN', '-nan', '1.#IND', '1.#QNAN', 'N/A', 'NA', 'NULL', 'NaN', 'n/a', 'nan', 'null']
df = df.replace(na_values, np.nan)
dtypes = {col1: 'float32', col2:'category'}
df = df.astype(dtype=dtypes, copy=False)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With