Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What problems can arise from a Spark non-deterministic Pandas UDF

I'm writing a process that needs to generate a UUID for certain groups that match based on some criteria. I got my code working, but I'm worried about potential issues from creating the UUID within my UDF (thus making it non-deterministic). Here's a simplified example of some code to illustrate:

from uuid import uuid1

from pyspark.sql import SparkSession
from pyspark.sql.functions import PandasUDFType, pandas_udf

spark = (
    SparkSession.builder.master("local")
    .appName("Word Count")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)
df = spark.createDataFrame([["j", 3], ["h", 3], ["a", 2]], ["name", "age"])


@pandas_udf("name string, age integer, uuid string", PandasUDFType.GROUPED_MAP)
def create_uuid(df):
    df["uuid"] = str(uuid1())
    return df


>>> df.groupby("age").apply(create_uuid).show()
+----+---+--------------------+
|name|age|                uuid|
+----+---+--------------------+
|   j|  3|1f8f48ac-0da8-430...|
|   h|  3|1f8f48ac-0da8-430...|
|   a|  2|d5206d03-bcce-445...|
+----+---+--------------------+

This currently works on some data processing over 200k records on AWS Glue, and I haven't found any bugs yet.

I use uuid1 since that uses the node information to generate the UUID thus ensuring no 2 nodes generate the same id.

One thought I had was to register the UDF as non-deterministic with:

udf = pandas_udf(
    create_uuid, "name string, age integer, uuid string", PandasUDFType.GROUPED_MAP
).asNondeterministic()

But that gave me the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o60.flatMapGroupsInPandas.
: org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found:
 `age`,create_uuid(name, age),`name`,`age`,`uuid`
in operator FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
               ;;
FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
+- Project [age#1L, name#0, age#1L]
   +- LogicalRDD [name#0, age#1L], false

My questions are:

  • What are some potential issues this could encounter?
  • If it does have potential issues, what are some says in which I could make this deterministic?
  • Why can't GROUPED_MAP functions be labeled as non-deterministic?
like image 702
aiguofer Avatar asked May 17 '20 06:05

aiguofer


People also ask

Why UDF are not recommended in Spark?

Caveats of Using Spark UDFs: Spark UDFs are not good but why?? 1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer.

What is difference between UDF and UDAF in Spark SQL?

Description. Spark SQL supports integration of Hive UDFs, UDAFs and UDTFs. Similar to Spark UDFs and UDAFs, Hive UDFs work on a single row as input and generate a single row as output, while Hive UDAFs operate on multiple rows and return a single aggregated row as a result.

What is pandas UDF Spark?

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.

Why are UDFs slow in PySpark?

Since 30th October, 2017, Spark just introduced vectorized udfs for pyspark. The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.


1 Answers

Your function is non-deterministic, but Spark is treating it as deterministic i.e. "Due to optimization, duplicate invocations maybe eliminated". However, each call to the pandas_udf will be a unique input (rows grouped by key), so the optimisation for duplicate calls to the pandas_udf won't be triggered. Hence, the asNondeterministic method to suppress such optimisations is redundant for a pandas_udf of GROUPED_MAP type. In my opinion, this explains why the GroupedData.apply function has not been coded to accept a pandas_udf marked as non-deterministic. It would make no sense to as there are no optimisation opportunities to suppress.

like image 70
Chris Avatar answered Oct 02 '22 00:10

Chris