Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using UDF ignores condition in when

Suppose you had the following pyspark DataFrame:

data= [('foo',), ('123',), (None,), ('bar',)]
df = sqlCtx.createDataFrame(data, ["col"])
df.show()
#+----+
#| col|
#+----+
#| foo|
#| 123|
#|null|
#| bar|
#+----+

The next two code blocks should do the same thing- that is, return the uppercase of the column if it is not null. However, the second method (using a udf) produces an error.

Method 1: Using pyspark.sql.functions.upper()

import pyspark.sql.functions as f
df.withColumn(
    'upper',
    f.when(
        f.isnull(f.col('col')),
        f.col('col')
    ).otherwise(f.upper(f.col('col')))
).show()
#+----+-----+
#| col|upper|
#+----+-----+
#| foo|  FOO|
#| 123|  123|
#|null| null|
#| bar|  BAR|
#+----+-----+

Method 2: Using str.upper() inside of a udf

df.withColumn(
    'upper',
    f.when(
        f.isnull(f.col('col')),
        f.col('col')
    ).otherwise(f.udf(lambda x: x.upper(), StringType())(f.col('col')))
).show()

This gives me AttributeError: 'NoneType' object has no attribute 'upper'. Why is the f.isnull() check in the call to when seemingly being ignored?

I know that I can change my udf to f.udf(lambda x: x.upper() if x else x, StringType()) to avoid this error, but I'd like to understand why it's happening.

Full Traceback:

Py4JJavaErrorTraceback (most recent call last)
<ipython-input-38-cbf0ffe73538> in <module>()
      4         f.isnull(f.col('col')),
      5         f.col('col')
----> 6     ).otherwise(f.udf(lambda x: x.upper(), StringType())(f.col('col')))
      7 ).show()

/opt/SPARK2/lib/spark2/python/pyspark/sql/dataframe.py in show(self, n, truncate)
    316         """
    317         if isinstance(truncate, bool) and truncate:
--> 318             print(self._jdf.showString(n, 20))
    319         else:
    320             print(self._jdf.showString(n, int(truncate)))

/opt/SPARK2/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/opt/SPARK2/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/SPARK2/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o642.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 77.0 failed 4 times, most recent failure: Lost task 51.3 in stage 77.0 (TID 5101, someserver.prod.somecompany.net, executor 99): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/opt/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-38-cbf0ffe73538>", line 6, in <lambda>
AttributeError: 'NoneType' object has no attribute 'upper'
like image 470
pault Avatar asked Apr 03 '18 16:04

pault


People also ask

Why UDF are not recommended in Spark?

1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer. Let's consider an example of a general optimization when reading data from Database or columnar format files such as Parquet is PredicatePushdown.

Why do we use UDF?

UDFs play a vital role in Spark MLlib to define new Transformers that are function objects that transform DataFrames into DataFrames by introducing new columns.

What is the use of UDF in Spark?

Description. User-Defined Functions (UDFs) are user-programmable routines that act on one row. This documentation lists the classes that are required for creating and registering UDFs. It also contains examples that demonstrate how to define and register UDFs and invoke them in Spark SQL.

Can Spark UDF return multiple columns?

UDF can return only a single column at the time.


1 Answers

You have to remember that Spark SQL (unlike RDD) is not what-you-see-is-what-you-get. Optimizer / planner is free to schedule operations in the arbitrary order or even repeat stages multiple times.

Python udfs are not applied on a Row basis, but using batch mode. when is not so much ignored, but not used to optimize execution plan:

== Physical Plan ==
*Project [col#0, CASE WHEN isnull(col#0) THEN col#0 ELSE pythonUDF0#21 END AS upper#17]
+- BatchEvalPython [<lambda>(col#0)], [col#0, pythonUDF0#21]
   +- Scan ExistingRDD[col#0]

Therefore function used with udf has to be robust to None inputs, for example:

df.withColumn(
    'upper',
    f.udf(
        lambda x: x.upper() if x is not None else None, 
        StringType()
    )(f.col('col'))
).show()
like image 167
user9592283 Avatar answered Nov 14 '22 21:11

user9592283