| business_time | value |
|---|---|
| 2024-06-29 05:00:03.287073252+00:00 | 1.3 |
| 2024-06-29 11:00:03.504073740+00:00 | 1.4 |
I am trying to create a partitioned table using the python code from the given DataFrame (see above). The following code perfectly works (that creates a non-partitioned table).
import pandas as pd
import pyiceberg
from pyiceberg.catalog import load_catalog
import pyarrow as pa
catalog = load_catalog("glue_catalog",type='glue')
df = pd.read_csv(r'Test.csv')
df['business_time']=pd.to_datetime(df['business_time'], utc=True).dt.floor('us').astype(pd.ArrowDtype(pa.timestamp("us", tz='UTC')))
pat = pa.Table.from_pandas(df)
t = catalog.create_table_if_not_exists('test_schema.test_partition', pat.schema) t.append(pat)
I'm trying to find a way to create a partitioned table that would have the same effect as the below
CREATE TABLE glue_catalog.test_schema.test_partition (
business_time timestamp(6) with time zone,
value double
)
WITH (
format = 'PARQUET',
format_version = 2,
partitioning = ARRAY['day(business_time)']
);
However, I am unable to find any information and pointers how to utilize the given DataFrame type to partition on the field buisness_time. Thank you.
I tried many things like adding an argument to create_table partition_spec=pyiceberg.partitioning.DayTransform('business_time'), but I got validation error. Is there any simple way to specify just a field? Thank you.
Finally after doing some research I found the solution in case anyone needs it in a future. Need to convert pyarrow schema to pyiceberg schema. The latter one has fields with numerical IDs.
import pandas as pd
from pyiceberg.catalog import load_catalog
import pyarrow as pa
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import YearTransform, DayTransform
from pyiceberg.io.pyarrow import pyarrow_to_schema
from pyiceberg.table.name_mapping import NameMapping, MappedField
catalog = load_catalog("qa_glue_iceberg",type='glue')
df = pd.read_csv(r'Test.csv')
df['business_time']=pd.to_datetime(df['business_time'], utc=True).dt.floor('us').astype(pd.ArrowDtype(pa.timestamp("us", tz='UTC')))
if catalog.table_exists('test_schema.test_partition'):
catalog.drop_table('test_schema.test_partition')
pat = pa.Table.from_pandas(df)
# Create mapping to map column name to unique integer
fieldId = 0
array = []
for name in pat.column_names:
fieldId+=1
array.append(MappedField(field_id=fieldId,names=[name]))
name_mapping = NameMapping(array)
icebergSchema = pyarrow_to_schema(pat.schema, name_mapping, downcast_ns_timestamp_to_us=True)
partitionField = icebergSchema.find_field('business_time')
partition_spec = PartitionSpec(
PartitionField(field_id=partitionField.field_id, source_id=partitionField.field_id, transform=YearTransform(), name="business_time_hour"))
t = catalog.create_table_if_not_exists('test_schema.test_partition', icebergSchema, partition_spec=partition_spec)
t.append(pat)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With