Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a PARTITIONED table in Python using PyIceberg with pyarrow

business_time value
2024-06-29 05:00:03.287073252+00:00 1.3
2024-06-29 11:00:03.504073740+00:00 1.4

I am trying to create a partitioned table using the python code from the given DataFrame (see above). The following code perfectly works (that creates a non-partitioned table).

import pandas as pd
import pyiceberg
from pyiceberg.catalog import load_catalog
import pyarrow as pa

catalog = load_catalog("glue_catalog",type='glue')
df = pd.read_csv(r'Test.csv')

df['business_time']=pd.to_datetime(df['business_time'], utc=True).dt.floor('us').astype(pd.ArrowDtype(pa.timestamp("us", tz='UTC')))

pat = pa.Table.from_pandas(df)
t = catalog.create_table_if_not_exists('test_schema.test_partition', pat.schema)                                     t.append(pat)

I'm trying to find a way to create a partitioned table that would have the same effect as the below


CREATE TABLE glue_catalog.test_schema.test_partition (
   business_time timestamp(6) with time zone,
   value double
)
WITH (
   format = 'PARQUET',
   format_version = 2,
   partitioning = ARRAY['day(business_time)']
);

However, I am unable to find any information and pointers how to utilize the given DataFrame type to partition on the field buisness_time. Thank you.

I tried many things like adding an argument to create_table partition_spec=pyiceberg.partitioning.DayTransform('business_time'), but I got validation error. Is there any simple way to specify just a field? Thank you.

like image 293
user23873134 Avatar asked Nov 02 '25 12:11

user23873134


1 Answers

Finally after doing some research I found the solution in case anyone needs it in a future. Need to convert pyarrow schema to pyiceberg schema. The latter one has fields with numerical IDs.

import pandas as pd
from pyiceberg.catalog import load_catalog
import pyarrow as pa

from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import YearTransform, DayTransform

from pyiceberg.io.pyarrow import pyarrow_to_schema

from pyiceberg.table.name_mapping import NameMapping, MappedField

catalog = load_catalog("qa_glue_iceberg",type='glue')
df = pd.read_csv(r'Test.csv')
df['business_time']=pd.to_datetime(df['business_time'], utc=True).dt.floor('us').astype(pd.ArrowDtype(pa.timestamp("us", tz='UTC')))

if catalog.table_exists('test_schema.test_partition'):
    catalog.drop_table('test_schema.test_partition')
    
pat = pa.Table.from_pandas(df)

# Create mapping to map column name to unique integer
fieldId = 0
array = []
for name in pat.column_names:
    fieldId+=1
    array.append(MappedField(field_id=fieldId,names=[name]))

name_mapping = NameMapping(array)

icebergSchema = pyarrow_to_schema(pat.schema, name_mapping, downcast_ns_timestamp_to_us=True)
partitionField = icebergSchema.find_field('business_time')
partition_spec = PartitionSpec(
    PartitionField(field_id=partitionField.field_id, source_id=partitionField.field_id, transform=YearTransform(), name="business_time_hour"))

t = catalog.create_table_if_not_exists('test_schema.test_partition', icebergSchema, partition_spec=partition_spec)

t.append(pat)
like image 162
user23873134 Avatar answered Nov 05 '25 05:11

user23873134



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!