I have a somewhat large (~20 GB) partitioned dataset in parquet format. I would like to read specific partitions from the dataset using pyarrow
. I thought I could accomplish this with pyarrow.parquet.ParquetDataset
, but that doesn't seem to be the case. Here is a small example to illustrate what I want.
To create a random dataset:
from collections import OrderedDict
from itertools import product, chain
from uuid import uuid4
import os
from glob import glob
import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow.parquet import ParquetWriter, ParquetDataset
def get_partitions(basepath, partitions):
"""Generate directory hierarchy for a paritioned dataset
data
├── part1=foo
│ └── part2=True
├── part1=foo
│ └── part2=False
├── part1=bar
│ └── part2=True
└── part1=bar
└── part2=False
"""
path_tmpl = '/'.join(['{}={}'] * len(partitions)) # part=value
path_tmpl = '{}/{}'.format(basepath, path_tmpl) # part1=val/part2=val
parts = [product([part], vals) for part, vals in partitions.items()]
parts = [i for i in product(*parts)]
return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts]
partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False])
parts = get_partitions('data', partitions)
for part in parts:
# 3 columns, 5 rows
data = [pa.array(np.random.rand(5)) for i in range(3)]
table = pa.Table.from_arrays(data, ['a', 'b', 'c'])
os.makedirs(part, exist_ok=True)
out = ParquetWriter('{}/{}.parquet'.format(part, uuid4()),
table.schema, flavor='spark')
out.write_table(table)
out.close()
I want to read all values for partition one, and only True for partition 2. With pandas.read_parquet
, that's not possible, I have to read the whole column always. I tried the following with pyarrow
:
parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True])
parts2 = get_partitions('data', parts2)
files = [glob('{}/*'.format(dirpath)) for dirpath in parts2]
files = [i for i in chain.from_iterable(files)]
df2 = ParquetDataset(files).read().to_pandas()
That doesn't work either:
>>> df2.columns
Index(['a', 'b', 'c'], dtype='object')
I can do this easily in pyspark
like this:
def get_spark_session_ctx(appName):
"""Get or create a Spark Session, and the underlying Context."""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(appName).getOrCreate()
sc = spark.sparkContext
return (spark, sc)
spark, sc = get_spark_session_ctx('test')
spark_df = spark.read.option('basePath', 'data').parquet(*parts2)
df3 = spark_df.toPandas()
As you can see below:
>>> df3.columns
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object')
Can this be done with pyarrow
or pandas
, or do I need some custom implementation?
Update: As requested by Wes, this is now on JIRA.
According to it, pyarrow is faster than fastparquet, little wonder it is the default engine used in dask.
You can use pandas to read snppay. parquet files into a python pandas dataframe.
An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values.
As of pyarrow version 0.10.0 you can use filters
kwarg to do the query. In your case it would look like something like this:
import pyarrow.parquet as pq
dataset = pq.ParquetDataset('path-to-your-dataset', filters=[('part2', '=', 'True'),])
table = dataset.read()
Ref
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With