Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python pandas_udf spark error

I started playing around with spark locally and finding this weird issue


    1) pip install pyspark==2.3.1
    2) pyspark>

    import pandas as pd
    from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
    df = pd.DataFrame({'x': [1,2,3], 'y':[1.0,2.0,3.0]})
    sp_df = spark.createDataFrame(df)

    @pandas_udf('long', PandasUDFType.SCALAR)
    def pandas_plus_one(v):
        return v + 1

    sp_df.withColumn('v2', pandas_plus_one(sp_df.x)).show()

Taking this example from here https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

Any idea why I keep getting this error?

py4j.protocol.Py4JJavaError: An error occurred while calling o108.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 8, localhost, executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:333)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:322)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:177)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.(ArrowEvalPythonExec.scala:90)
    at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:88)
    at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:131)
    at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:93)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:158)
    ... 27 more
like image 666
Shrikar Avatar asked Aug 06 '18 18:08

Shrikar


People also ask

How does pandas UDF behave in spark?

A Pandas UDF behaves as a regular PySpark function API in general. Before Spark 3.0, Pandas UDFs used to be defined with PandasUDFType. From Spark 3.0 with Python 3.6+, you can also use Python type hints . Using Python type hints are preferred and using PandasUDFType will be deprecated in the future release.

What is the difference between pandas UDF and python function?

A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general. New in version 2.3.0. user-defined function. A python function if used as a standalone function the return type of the user-defined function.

What is UDF (user defined functions) in Python spark?

- GeeksforGeeks How to Write Spark UDF (User Defined Functions) in Python ? In this article, we will talk about UDF (User Defined Functions) and how to write these in Python Spark. UDF, basically stands for User Defined Functions.

How to use palindrome function in pandas UDF?

To use pandas udf we have to change the function so that it will operate on series as shown below by adding a wrapper fucntion to palindrome function called vector palindrome which will use apply method of pandas series to use palindrome function to pandas series then we have to register this function with pandas udf and call the function


2 Answers

I had the same problem. I found it to be a version problem between pandas and numpy.

For me the following works:

numpy==1.14.5
pandas==0.23.4
pyarrow==0.10.0

before I had the following non working combination:

numpy==1.15.1
pandas==0.23.4
pyarrow==0.10.0
like image 83
Sebastian Avatar answered Nov 04 '22 04:11

Sebastian


I found the issue to be only an incompatible version of pyarrow. Spark 2.4.0 was built with pyarrow 0.10.0 (https://issues.apache.org/jira/browse/SPARK-23874).

I reverted my pyarrow package to 0.10.0 (current version was 0.15.x) and it worked perfectly.

Config that works for me is..

numpy==1.14.3
pandas==0.23.0
pyarrow==0.10.0
like image 31
varun Avatar answered Nov 04 '22 03:11

varun