Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read a list of parquet files from S3 as a pandas dataframe using pyarrow?

I have a hacky way of achieving this using boto3 (1.4.4), pyarrow (0.4.1) and pandas (0.20.3).

First, I can read a single parquet file locally like this:

import pyarrow.parquet as pq  path = 'parquet/part-r-00000-1e638be4-e31f-498a-a359-47d017a0059c.gz.parquet' table = pq.read_table(path) df = table.to_pandas() 

I can also read a directory of parquet files locally like this:

import pyarrow.parquet as pq  dataset = pq.ParquetDataset('parquet/') table = dataset.read() df = table.to_pandas() 

Both work like a charm. Now I want to achieve the same remotely with files stored in a S3 bucket. I was hoping that something like this would work:

dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket') 

But it does not:

OSError: Passed non-file path: s3n://dsn/to/my/bucket

After reading pyarrow's documentation thoroughly, this does not seem possible at the moment. So I came out with the following solution:

Reading a single file from S3 and getting a pandas dataframe:

import io import boto3 import pyarrow.parquet as pq  buffer = io.BytesIO() s3 = boto3.resource('s3') s3_object = s3.Object('bucket-name', 'key/to/parquet/file.gz.parquet') s3_object.download_fileobj(buffer) table = pq.read_table(buffer) df = table.to_pandas() 

And here my hacky, not-so-optimized, solution to create a pandas dataframe from a S3 folder path:

import io import boto3 import pandas as pd import pyarrow.parquet as pq  bucket_name = 'bucket-name' def download_s3_parquet_file(s3, bucket, key):     buffer = io.BytesIO()     s3.Object(bucket, key).download_fileobj(buffer)     return buffer  client = boto3.client('s3') s3 = boto3.resource('s3') objects_dict = client.list_objects_v2(Bucket=bucket_name, Prefix='my/folder/prefix') s3_keys = [item['Key'] for item in objects_dict['Contents'] if item['Key'].endswith('.parquet')] buffers = [download_s3_parquet_file(s3, bucket_name, key) for key in s3_keys] dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers] df = pd.concat(dfs, ignore_index=True) 

Is there a better way to achieve this? Maybe some kind of connector for pandas using pyarrow? I would like to avoid using pyspark, but if there is no other solution, then I would take it.

like image 301
Diego Mora Cespedes Avatar asked Jul 11 '17 20:07

Diego Mora Cespedes


People also ask

Which function do you use to read a parquet file into a DataFrame?

Parquet files are always large. so read it using dask.

How do I convert from parquet to pandas?

Pandas DataFrame: to_parquet() function The to_parquet() function is used to write a DataFrame to the binary parquet format. This function writes the dataframe as a parquet file. File path or Root Directory path. Will be used as Root Directory path while writing a partitioned dataset.


2 Answers

You should use the s3fs module as proposed by yjk21. However as result of calling ParquetDataset you'll get a pyarrow.parquet.ParquetDataset object. To get the Pandas DataFrame you'll rather want to apply .read_pandas().to_pandas() to it:

import pyarrow.parquet as pq import s3fs s3 = s3fs.S3FileSystem()  pandas_dataframe = pq.ParquetDataset('s3://your-bucket/', filesystem=s3).read_pandas().to_pandas() 
like image 65
vak Avatar answered Sep 19 '22 20:09

vak


Thanks! Your question actually tell me a lot. This is how I do it now with pandas (0.21.1), which will call pyarrow, and boto3 (1.3.1).

import boto3 import io import pandas as pd  # Read single parquet file from S3 def pd_read_s3_parquet(key, bucket, s3_client=None, **args):     if s3_client is None:         s3_client = boto3.client('s3')     obj = s3_client.get_object(Bucket=bucket, Key=key)     return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)  # Read multiple parquets from a folder on S3 generated by spark def pd_read_s3_multiple_parquets(filepath, bucket, s3=None,                                   s3_client=None, verbose=False, **args):     if not filepath.endswith('/'):         filepath = filepath + '/'  # Add '/' to the end     if s3_client is None:         s3_client = boto3.client('s3')     if s3 is None:         s3 = boto3.resource('s3')     s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)                if item.key.endswith('.parquet')]     if not s3_keys:         print('No parquet found in', bucket, filepath)     elif verbose:         print('Load parquets:')         for p in s3_keys:              print(p)     dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args)             for key in s3_keys]     return pd.concat(dfs, ignore_index=True) 

Then you can read multiple parquets under a folder from S3 by

df = pd_read_s3_multiple_parquets('path/to/folder', 'my_bucket') 

(One can simplify this code a lot I guess.)

like image 35
Louis Yang Avatar answered Sep 17 '22 20:09

Louis Yang