Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to incorporate projected columns in scanner into new dataset partitioning

Tags:

pyarrow

Let's say I load a dataset

myds=ds.dataset('mypath', format='parquet', partitioning='hive')
myds.schema
# On/Off_Peak: string
# area: string
# price: decimal128(8, 4)
# date: date32[day]
# hourbegin: int32
# hourend: int32
# inflation: string rename to Inflation
# Price_Type: string
# Reference_Year: int32
# Case: string
# region: string rename to Region

My end goal is to resave the dataset with the following projection:

projection={'Region':ds.field('region'),
            'Date':ds.field('date'),
            'isPeak':pc.equal(ds.field('On/Off_Peak'),ds.scalar('On')),
            'Hourbegin':ds.field('hourbegin'),
            'Hourend':ds.field('hourend'),
            'Inflation':ds.field('inflation'),
            'Price_Type':ds.field('Price_Type'),
            'Area':ds.field('area'),
            'Price':ds.field('price'),
            'Reference_Year':ds.field('Reference_Year'),
            'Case':ds.field('Case'),
            }

I make a scanner

scanner=myds.scanner(columns=projection)

Now I try to save my new dataset with

ds.write_dataset(scanner, 'newpath',
                partitioning=['Reference_Year', 'Case', 'Region'], partitioning_flavor='hive',
                format='parquet')

but I get

KeyError: 'Column Region does not exist in schema'

I can work around this by changing my partitioning to ['Reference_Year', 'Case', 'region'] to match the non-projected columns (and then later changing the name of all those directories) but is there a way to do it directly?

Suppose my partitioning needed the compute for more than just the column name changing. Would I have to save a non-partitioned dataset in one step to get the new column and then do another save operation to create the partitioned dataset?

like image 838
Dean MacGregor Avatar asked Oct 14 '25 18:10

Dean MacGregor


1 Answers

EDIT: this bug has been fixed in pyarrow 10.0.0

It looks like a bug to me. It's as if write_dataset is looking at the dataset_schema rather than the projected_schema

I think you can get around it by calling to_reader on the scanner.

table = pa.Table.from_arrays(
    [
        pa.array(['a', 'b', 'c'], pa.string()),
        pa.array(['a', 'b', 'c'], pa.string()),
    ],
    names=['region', "Other"]
)
table_dataset = ds.dataset(table)
columns={
    "Region": ds.field('region'),
    "Other": ds.field('Other'),
}
scanner = table_dataset.scanner(columns=columns)

ds.write_dataset(
    scanner.to_reader(), 
    'newpath',
    partitioning=['Region'], partitioning_flavor='hive',
    format='parquet')

I've reported the issue here

like image 90
0x26res Avatar answered Oct 19 '25 15:10

0x26res