I have recently started getting a bunch of errors on a number of pyspark
jobs running on EMR clusters. The erros are
java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:98)
at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)...
They all seem to happen in apply
functions of a pandas series. The only change I found is that pyarrow
has been updated on Saturday (05/10/2019). Tests seem to work with 0.14.1
So my question is if anyone know if this is a bug in the new updated pyarrow or is there some significant change that will make pandasUDF hard to use in the future?
PyArrow library provides a Python API for the functionality provided by the Arrow libraries, along with tools for Arrow integration and interoperability with pandas, NumPy, and other software in the Python ecosystem.
If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the SQL module with the command pip install pyspark[sql] . Otherwise, you must ensure that PyArrow is installed and available on all cluster nodes. You can install using pip or conda from the conda-forge channel.
Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that work with Pandas/NumPy data.
It's not a bug. We made an important protocol change in 0.15.0 that makes the default behavior of pyarrow incompatible with older versions of Arrow in Java -- your Spark environment seems to be using an older version.
Your options are
ARROW_PRE_0_15_IPC_FORMAT=1
from where you are using PythonHopefully the Spark community will be able to upgrade to 0.15.0 in Java soon so this issue goes away.
This is discussed in http://arrow.apache.org/blog/2019/10/06/0.15.0-release/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With