I'm trying to use Pandas UDFs (a.k.a. Vectorized UDFs) in Apache Spark 2.4.0 on macOS 10.14.3 (macOS Mojave).
I installed pandas
and pyarrow
using pip
(and later pip3
).
Whenever I execute the sample code from the official documentation of Spark SQL I get the following exception.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
def multiply_func(a, b):
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
The exception is as follows:
objc[97883]: +[__NSPlaceholderDictionary initialize] may have been in progress in another thread when fork() was called.
objc[97883]: +[__NSPlaceholderDictionary initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
19/03/27 15:01:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:486)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:475)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:34)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:178)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:98)
at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:128)
...
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:159)
... 28 more
Scalar Pandas UDFs are used for vectorizing scalar operations. To define a scalar Pandas UDF, simply use @pandas_udf to annotate a Python function that takes in pandas. Series as arguments and returns another pandas. Series of the same size.
A pandas user-defined function (UDF)—also known as vectorized UDF—is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data. pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.
I found a solution in Doesn't work on macOS High Sierra #69 and thought I'd post it on StackOverflow.
You should make sure that Xcode's command line tools are already installed. If not, execute the following:
xcode-select --install
What turned out very important was to export OBJC_DISABLE_INITIALIZE_FORK_SAFETY
env var:
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
With the two above the code worked fine:
>>> # Execute function as a Spark vectorized UDF
... df.select(multiply(col("x"), col("x"))).show()
[Stage 0:> (0 + 1) / 1]/usr/local/lib/python3.7/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
/usr/local/lib/python3.7/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
/usr/local/lib/python3.7/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
/usr/local/lib/python3.7/site-packages/pyarrow/__init__.py:159: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
+-------------------+
|multiply_func(x, x)|
+-------------------+
| 1|
| 4|
| 9|
+-------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With