I started playing around with spark locally and finding this weird issue
    1) pip install pyspark==2.3.1
    2) pyspark>
    import pandas as pd
    from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
    df = pd.DataFrame({'x': [1,2,3], 'y':[1.0,2.0,3.0]})
    sp_df = spark.createDataFrame(df)
    @pandas_udf('long', PandasUDFType.SCALAR)
    def pandas_plus_one(v):
        return v + 1
    sp_df.withColumn('v2', pandas_plus_one(sp_df.x)).show()
Taking this example from here https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
Any idea why I keep getting this error?
py4j.protocol.Py4JJavaError: An error occurred while calling o108.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 8, localhost, executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:333)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:322)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:177)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.(ArrowEvalPythonExec.scala:90)
    at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:88)
    at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:131)
    at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:93)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:158)
    ... 27 more
                A Pandas UDF behaves as a regular PySpark function API in general. Before Spark 3.0, Pandas UDFs used to be defined with PandasUDFType. From Spark 3.0 with Python 3.6+, you can also use Python type hints . Using Python type hints are preferred and using PandasUDFType will be deprecated in the future release.
A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general. New in version 2.3.0. user-defined function. A python function if used as a standalone function the return type of the user-defined function.
- GeeksforGeeks How to Write Spark UDF (User Defined Functions) in Python ? In this article, we will talk about UDF (User Defined Functions) and how to write these in Python Spark. UDF, basically stands for User Defined Functions.
To use pandas udf we have to change the function so that it will operate on series as shown below by adding a wrapper fucntion to palindrome function called vector palindrome which will use apply method of pandas series to use palindrome function to pandas series then we have to register this function with pandas udf and call the function
I had the same problem. I found it to be a version problem between pandas and numpy.
For me the following works:
numpy==1.14.5
pandas==0.23.4
pyarrow==0.10.0
before I had the following non working combination:
numpy==1.15.1
pandas==0.23.4
pyarrow==0.10.0
I found the issue to be only an incompatible version of pyarrow. Spark 2.4.0 was built with pyarrow 0.10.0 (https://issues.apache.org/jira/browse/SPARK-23874).
I reverted my pyarrow package to 0.10.0 (current version was 0.15.x) and it worked perfectly.
Config that works for me is..
numpy==1.14.3
pandas==0.23.0
pyarrow==0.10.0
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With