Let's say I load a dataset
myds=ds.dataset('mypath', format='parquet', partitioning='hive')
myds.schema
# On/Off_Peak: string
# area: string
# price: decimal128(8, 4)
# date: date32[day]
# hourbegin: int32
# hourend: int32
# inflation: string rename to Inflation
# Price_Type: string
# Reference_Year: int32
# Case: string
# region: string rename to Region
My end goal is to resave the dataset with the following projection:
projection={'Region':ds.field('region'),
'Date':ds.field('date'),
'isPeak':pc.equal(ds.field('On/Off_Peak'),ds.scalar('On')),
'Hourbegin':ds.field('hourbegin'),
'Hourend':ds.field('hourend'),
'Inflation':ds.field('inflation'),
'Price_Type':ds.field('Price_Type'),
'Area':ds.field('area'),
'Price':ds.field('price'),
'Reference_Year':ds.field('Reference_Year'),
'Case':ds.field('Case'),
}
I make a scanner
scanner=myds.scanner(columns=projection)
Now I try to save my new dataset with
ds.write_dataset(scanner, 'newpath',
partitioning=['Reference_Year', 'Case', 'Region'], partitioning_flavor='hive',
format='parquet')
but I get
KeyError: 'Column Region does not exist in schema'
I can work around this by changing my partitioning to ['Reference_Year', 'Case', 'region'] to match the non-projected columns (and then later changing the name of all those directories) but is there a way to do it directly?
Suppose my partitioning needed the compute for more than just the column name changing. Would I have to save a non-partitioned dataset in one step to get the new column and then do another save operation to create the partitioned dataset?
EDIT: this bug has been fixed in pyarrow 10.0.0
It looks like a bug to me. It's as if write_dataset is looking at the dataset_schema rather than the projected_schema
I think you can get around it by calling to_reader on the scanner.
table = pa.Table.from_arrays(
[
pa.array(['a', 'b', 'c'], pa.string()),
pa.array(['a', 'b', 'c'], pa.string()),
],
names=['region', "Other"]
)
table_dataset = ds.dataset(table)
columns={
"Region": ds.field('region'),
"Other": ds.field('Other'),
}
scanner = table_dataset.scanner(columns=columns)
ds.write_dataset(
scanner.to_reader(),
'newpath',
partitioning=['Region'], partitioning_flavor='hive',
format='parquet')
I've reported the issue here
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With