I am trying to enable Apache Arrow for conversion to Pandas. I am using:
pyspark 2.4.4 pyarrow 0.15.0 pandas 0.25.1 numpy 1.17.2
This is the example code
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
x = pd.Series([1, 2, 3])
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
I got this warning message
c:\users\administrator\appdata\local\programs\python\python37\lib\site-packages\pyspark\sql\session.py:714: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true; however, failed by the reason below:
An error occurred while calling z:org.apache.spark.sql.api.python.PythonSQLUtils.readArrowStreamFromFile.
: java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.readNextBatch(ArrowConverters.scala:243)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$3.<init>(ArrowConverters.scala:229)
at org.apache.spark.sql.execution.arrow.ArrowConverters$.getBatchesFromStream(ArrowConverters.scala:228)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:216)
at org.apache.spark.sql.execution.arrow.ArrowConverters$$anonfun$readArrowStreamFromFile$2.apply(ArrowConverters.scala:214)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543)
at org.apache.spark.sql.execution.arrow.ArrowConverters$.readArrowStreamFromFile(ArrowConverters.scala:214)
at org.apache.spark.sql.api.python.PythonSQLUtils$.readArrowStreamFromFile(PythonSQLUtils.scala:46)
at org.apache.spark.sql.api.python.PythonSQLUtils.readArrowStreamFromFile(PythonSQLUtils.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.
warnings.warn(msg)
Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that work with Pandas/NumPy data.
If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the SQL module with the command pip install pyspark[sql] . Otherwise, you must ensure that PyArrow is installed and available on all cluster nodes. You can install using pip or conda from the conda-forge channel.
PySpark users can access the full PySpark APIs by calling DataFrame. to_spark() . pandas-on-Spark DataFrame and Spark DataFrame are virtually interchangeable. However, note that a new default index is created when pandas-on-Spark DataFrame is created from Spark DataFrame.
So if you cannot find the function that you need, you can still do the following: Transform the pandas-on-Spark dataframe into a PySpark dataframe. Perform multiple transformations using PySpark API. Convert the PySpark dataframe back into a pandas-on-Spark dataframe.
We made a change in 0.15.0 that makes the default behavior of pyarrow incompatible with older versions of Arrow in Java -- your Spark environment seems to be using an older version.
Your options are
ARROW_PRE_0_15_IPC_FORMAT=1
from where you are using PythonFor calling my pandas UDF in my Spark 2.4.4 cluster with pyarrow==0.15
. I struggled with setting the ARROW_PRE_0_15_IPC_FORMAT=1
flag as mentioned above successfully.
I set the flag in (1) the command line via export
on the head node, (2) via spark-env.sh
and yarn-env.sh
on all nodes in the cluster, and (3) in the pyspark code itself from my script on the head node. None of these worked to actually set this flag inside of the udf, for unknown reasons.
The simplest solution I found was to call this inside the udf:
@pandas_udf("integer", PandasUDFType.SCALAR)
def foo(*args):
import os
os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
#...
Hopefully this saves someone else several hours.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With