I would like to define a Snowpark UDF with input type snowflake.snowpark.Row.
The reason for this is that I would like to mimic the pandas.apply approach
where I can define my business logic in some class, and then apply the logic to each row of the Snowpark dataframe. Each column can be easily mapped to a class attribute with asDict
For example (running from the Snowflake Python worksheet):
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark import Row
from snowflake.snowpark.types import IntegerType
from dataclasses import dataclass
@dataclass
class MyEvent:
attribute1: str = 'dummy'
attribute2: str = 'unknown'
def someCalculation(self) -> int:
return len(self.attribute1) + len(self.attribute2.strip())
def testSomeCalculation():
inputDict = {'attribute1': 'foo',
'attribute2': 'baz'}
event = MyEvent(**inputDict)
print(event.someCalculation())
def main(session: snowpark.Session):
some_logic = udf(lambda row: MyEvent(**(row.asDict())).someCalculation()
, return_type=IntegerType()
, input_types=[Row])
However, when I try to use snowpark.Row as input type, I get an unsupported data type:
File "snowflake/snowpark/_internal/udf_utils.py", line 972, in create_python_udf_or_sp
input_sql_types = [convert_sp_to_sf_type(arg.datatype) for arg in input_args]
File "snowflake/snowpark/_internal/udf_utils.py", line 972, in <listcomp>
input_sql_types = [convert_sp_to_sf_type(arg.datatype) for arg in input_args]
File "snowflake/snowpark/_internal/type_utils.py", line 195, in convert_sp_to_sf_type
raise TypeError(f"Unsupported data type: {datatype.__class__.__name__}")
TypeError: Unsupported data type: type
I see that all the UDF examples use basic types from snowpark.types.
Is there any fundamental reason why the input type cannot be a snowpark.Row ?
I know I could list explicitly all MyEvent attributes in input_type=[],
but that is going to be error prone and defeating the purpose of designing
my code around a class representing my business object.
This is the official type mapping for Python UDFs in Snowflake:
If you want to receive a dict, then make the input a dict (which in SQL will be a variant or object).
Then instead of processing the row, just transform the row to dict before sending it to the UDF.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With