Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Write Pandas Dataframe to_csv StringIO instead of file

Objective of this code is to read an existing CSV file from a specified S3 bucket into a Dataframe, filter the dataframe for desired columns, and then write the filtered Dataframe to a CSV object using StringIO that I can upload to a different S3 bucket.

Everything works right now except the code block for the function "prepare_file_for_upload". Below is the full code block:

from io import StringIO
import io #unsued at the moment
import logging
import pandas as pd
import boto3
from botocore.exceptions import ClientError

FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

#S3 parameters
source_bucket = 'REPLACE'
source_folder = 'REPLACE/'
dest_bucket = 'REPLACE'
dest_folder = 'REPLACE'
output_name = 'REPLACE'

def get_file_name():
try:
    s3 = boto3.client("s3")
    logging.info(f'Determining filename from: {source_bucket}/{source_folder}')
    bucket_path = s3.list_objects(Bucket=source_bucket, Prefix=source_folder)
    file_name =[key['Key'] for key in bucket_path['Contents']][1]
    logging.info(file_name)
    return file_name
except ClientError as e:
    logging.info(f'Unable to determine file name from bucket {source_bucket}/{source_folder}')
    logging.info(e)

def get_file_data(file_name):
try:
    s3 = boto3.client("s3")
    logging.info(f'file name from get data: {file_name}')
    obj = s3.get_object(Bucket=source_bucket, Key=file_name)
    body = obj['Body']
    body_string = body.read().decode('utf-8')
    file_data = pd.read_csv(StringIO(body_string))
    #logging.info(file_data)
    return file_data
except ClientError as e:
    logging.info(f'Unable to read {file_name} into datafame')
    logging.info(e)

def filter_file_data(file_data):
try:
    all_columns = list(file_data.columns)
    columns_used = ('col_1', 'col_2', 'col_3')
    desired_columns = [x for x in all_columns if x in columns_used]
    filtered_data = file_data[desired_columns]
    logging.info(type(filtered_data)) #for testing
    return filtered_data
except Exception as e:
    logging.info('Unable to filter file')
    logging.info(e)

The block below is where I am attempting to write the existing DF that was passed to the function using "to_csv" method with StringIO instead of creating a local file. to_csv will write to a local file but does not work with buffer (yes, I tried putting the buffer cursor to start position after and still nothing)

def prepare_file_for_upload(filtered_data): #this is the function block where I am stuck
try:
    buffer = StringIO()
    output_name = 'FILE_NAME.csv'
    #code below is writing to file but can not get to write to buffer
    output_file = filtered_data.to_csv(buffer, sep=',')
    df = pd.DataFrame(buffer) #for testing
    logging.info(df) #for testing
    return output_file
except Exception as e:
    logging.info(f'Unable to prepare {output_name} for upload')
    logging.info(e)

def upload_file(adjusted_file):
try:
    #dest_key = f'{dest_folder}/{output_name}'
    dest_key = f'{output_name}'
    s3 = boto3.resource('s3')
    s3.meta.client.upload_file(adjusted_file, dest_bucket, dest_key)
except ClientError as e:
    logging.info(f'Unable to upload {output_name} to {dest_key}')
    logging.info(e)

def execute_program():
file_name = get_file_name()
file_data = get_file_data(file_name)
filtered_data = filter_file_data(file_data)
adjusted_file = prepare_file_for_upload(filtered_data)
upload_file = upload_file(adjusted_file)

if __name__ == '__main__':
execute_program()
like image 257
Sophistafunk Avatar asked Jun 21 '18 02:06

Sophistafunk


People also ask

Does To_csv overwrite file?

Does pandas To_csv overwrite? If the file already exists, it will be overwritten. If no path is given, then the Frame will be serialized into a string, and that string will be returned.

What happens when we use PD To_csv ()?

Pandas DataFrame to_csv() function converts DataFrame into CSV data. We can pass a file object to write the CSV data into a file. Otherwise, the CSV data is returned in the string format.

Can pandas write directly to S3?

Pandas support directly uploading your files to S3 using pd. to_csv . It also supports feather and parquet files. However, the only drawback is that it will overwrite any existing file with the same name at the given S3 location.

How do I read a .TXT file in pandas?

One can read a text file (txt) by using the pandas read_fwf() function, fwf stands for fixed-width lines, you can use this to read fixed length or variable length text files. Alternatively, you can also read txt file with pandas read_csv() function.


2 Answers

Following solution worked for me:

csv_buffer = StringIO()
output_file = filtered_data.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(dest_bucket, output_name).put(Body=csv_buffer.getvalue())
like image 98
Sophistafunk Avatar answered Sep 18 '22 13:09

Sophistafunk


When working with a BytesIO object, pay careful attention to the order of operations. In your code, you instantiate the BytesIO object and then fill it via a call to to_csv(). So far so good. But one thing to manage when working with a BytesIO object that is different from a file workflow is the stream position.

After writing data to the stream, the stream position is at the end of the stream. If you try to write from that position, you will likely write nothing! The operation will complete leaving you scratching your head why no results are written to S3. Add a call to seek() with the argument 0 to your function. Here is a demo program that demonstrates:

from io import BytesIO
import boto3
import pandas
from pandas import util
df = util.testing.makeMixedDataFrame()
s3_resource = boto3.resource("s3")
buffer = BytesIO()
df.to_csv(buffer, sep=",", index=False, mode="wb", encoding="UTF-8")

# The following call to `tell()` returns the stream position. 0 is the beginning of the file.
df.tell()
>> 134

# Reposition stream to the beginning by calling `seek(0)` before uploading
df.seek(0)
s3_r.Object("test-bucket", "test_df_from_resource.csv").put(Body=buffer.getvalue())

You should get a response similar to the following (with actual values)

>> {'ResponseMetadata': {'RequestId': 'request-id-value',
'HostId': '###########',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '############',
   'x-amz-request-id': '00000',
   'date': 'Tue, 31 Aug 2021 00:00:00 GMT',
   'x-amz-server-side-encryption': 'value',
   'etag': '"xxxx"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"xxxx"',
 'ServerSideEncryption': 'value'}

Changing the code to move the stream position should solve the issues you were facing. It is also worth mentioning, Pandas had a bug that caused unexpected behavior when writing to a bytes object. It was fixed and the sample I provided assumes you are running a version of Python greater than 3.8 and a version of Pandas greater than 1.3.2. Further information on IO can be found in the python documentation.

like image 28
Nathan Avatar answered Sep 18 '22 13:09

Nathan