Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does a PySpark UDF that operates on a column generated by rand() fail?

Given the following Python function:

def f(col):
    return col

If I turn it into a UDF and apply it to a column object, it works...

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

df = spark.range(10)
udf = F.udf(f, returnType=DoubleType()).asNondeterministic()

df.withColumn('new', udf(F.lit(0))).show()

...Except if the column is generated by rand:

df.withColumn('new', udf(F.rand())).show()  # fails

However, the following two work:

df.withColumn('new', F.rand()).show()
df.withColumn('new', F.rand()).withColumn('new2', udf(F.col('new'))).show()

The error:

Py4JJavaError: An error occurred while calling o469.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 34, localhost, executor driver): java.lang.NullPointerException

Why does this happen, and how can I use a rand column expression created within a UDF?

like image 628
gmds Avatar asked Apr 24 '19 05:04

gmds


1 Answers

The core issue is that the rand() function on the JVM side depends on a transient rng variable that doesn't survive serialization/deserialization coupled with an eval implementation that is null unsafe (defined in RDG class and Rand subclass here). As far as I can tell, rand() and randn() are the only functions with these specific properties in spark

When you write udf(F.rand()) spark evaluates this as a single PythonUDF expression and thus serialize the rand() invocation in the command_pickle, losing the initialized transient along the way. This can be observed with the execution plan:

df.withColumn('new', udf(F.rand())).explain()

== Physical Plan ==
*(2) Project [id#0L, pythonUDF0#95 AS new#92]
+- BatchEvalPython [f(rand(-6878806567622466209))], [id#0L, pythonUDF0#95]
   +- *(1) Range (0, 10, step=1, splits=8)

Unfortunately, you're unlikely to overcome this issue without a fix in spark to make the Rand class null safe, however if you just need to generate random numbers you can trivially build your own rand() udf around Python random generator:

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from random import random

def f(col):
    return col

df = spark.range(10)
udf = F.udf(f, returnType=DoubleType()).asNondeterministic()
rand = F.udf(random, returnType=DoubleType()).asNondeterministic()

df.withColumn('new', udf(rand())).show()

+---+-------------------+
| id|                new|
+---+-------------------+
|  0| 0.4384090392727712|
|  1| 0.5827392568376621|
|  2| 0.4249312702725516|
|  3| 0.8423409231783007|
|  4|0.39533981334524604|
|  5| 0.7073194901736066|
|  6|0.19176164335919255|
|  7| 0.7296698171715453|
|  8|  0.799510901886918|
|  9|0.12662129139761658|
+---+-------------------+
like image 155
rluta Avatar answered Nov 28 '22 03:11

rluta