I'm writing a process that needs to generate a UUID for certain groups that match based on some criteria. I got my code working, but I'm worried about potential issues from creating the UUID within my UDF (thus making it non-deterministic). Here's a simplified example of some code to illustrate:
from uuid import uuid1
from pyspark.sql import SparkSession
from pyspark.sql.functions import PandasUDFType, pandas_udf
spark = (
SparkSession.builder.master("local")
.appName("Word Count")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
df = spark.createDataFrame([["j", 3], ["h", 3], ["a", 2]], ["name", "age"])
@pandas_udf("name string, age integer, uuid string", PandasUDFType.GROUPED_MAP)
def create_uuid(df):
df["uuid"] = str(uuid1())
return df
>>> df.groupby("age").apply(create_uuid).show()
+----+---+--------------------+
|name|age| uuid|
+----+---+--------------------+
| j| 3|1f8f48ac-0da8-430...|
| h| 3|1f8f48ac-0da8-430...|
| a| 2|d5206d03-bcce-445...|
+----+---+--------------------+
This currently works on some data processing over 200k records on AWS Glue, and I haven't found any bugs yet.
I use uuid1
since that uses the node information to generate the UUID thus ensuring no 2 nodes generate the same id.
One thought I had was to register the UDF as non-deterministic with:
udf = pandas_udf(
create_uuid, "name string, age integer, uuid string", PandasUDFType.GROUPED_MAP
).asNondeterministic()
But that gave me the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o60.flatMapGroupsInPandas.
: org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found:
`age`,create_uuid(name, age),`name`,`age`,`uuid`
in operator FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
;;
FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
+- Project [age#1L, name#0, age#1L]
+- LogicalRDD [name#0, age#1L], false
My questions are:
Caveats of Using Spark UDFs: Spark UDFs are not good but why?? 1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer.
Description. Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.
Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.
Since 30th October, 2017, Spark just introduced vectorized udfs for pyspark. The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.
Your function is non-deterministic, but Spark is treating it as deterministic i.e. "Due to optimization, duplicate invocations maybe eliminated". However, each call to the pandas_udf
will be a unique input (rows grouped by key), so the optimisation for duplicate calls to the pandas_udf
won't be triggered. Hence, the asNondeterministic
method to suppress such optimisations is redundant for a pandas_udf
of GROUPED_MAP
type. In my opinion, this explains why the GroupedData.apply
function has not been coded to accept a pandas_udf
marked as non-deterministic. It would make no sense to as there are no optimisation opportunities to suppress.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With