Given the following Python function:
def f(col):
return col
If I turn it into a UDF and apply it to a column object, it works...
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
df = spark.range(10)
udf = F.udf(f, returnType=DoubleType()).asNondeterministic()
df.withColumn('new', udf(F.lit(0))).show()
...Except if the column is generated by rand
:
df.withColumn('new', udf(F.rand())).show() # fails
However, the following two work:
df.withColumn('new', F.rand()).show()
df.withColumn('new', F.rand()).withColumn('new2', udf(F.col('new'))).show()
The error:
Py4JJavaError: An error occurred while calling o469.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 34, localhost, executor driver): java.lang.NullPointerException
Why does this happen, and how can I use a rand
column expression created within a UDF?
The core issue is that the rand() function on the JVM side depends on a transient rng variable that doesn't survive serialization/deserialization coupled with an eval
implementation that is null unsafe (defined in RDG class and Rand subclass here). As far as I can tell, rand()
and randn()
are the only functions with these specific properties in spark
When you write udf(F.rand())
spark evaluates this as a single PythonUDF expression and thus serialize the rand() invocation in the command_pickle, losing the initialized transient along the way. This can be observed with the execution plan:
df.withColumn('new', udf(F.rand())).explain()
== Physical Plan ==
*(2) Project [id#0L, pythonUDF0#95 AS new#92]
+- BatchEvalPython [f(rand(-6878806567622466209))], [id#0L, pythonUDF0#95]
+- *(1) Range (0, 10, step=1, splits=8)
Unfortunately, you're unlikely to overcome this issue without a fix in spark to make the Rand class null safe, however if you just need to generate random numbers you can trivially build your own rand() udf around Python random generator:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from random import random
def f(col):
return col
df = spark.range(10)
udf = F.udf(f, returnType=DoubleType()).asNondeterministic()
rand = F.udf(random, returnType=DoubleType()).asNondeterministic()
df.withColumn('new', udf(rand())).show()
+---+-------------------+
| id| new|
+---+-------------------+
| 0| 0.4384090392727712|
| 1| 0.5827392568376621|
| 2| 0.4249312702725516|
| 3| 0.8423409231783007|
| 4|0.39533981334524604|
| 5| 0.7073194901736066|
| 6|0.19176164335919255|
| 7| 0.7296698171715453|
| 8| 0.799510901886918|
| 9|0.12662129139761658|
+---+-------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With