I'm running a job in pyspark
where I at one point use a grouped aggregate Pandas UDF. This results in the following (here abbreviate) error:
org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
I'm fairly sure this is because one of the groups the pandas UDF receives is huge, and if I reduce the dataset and removes enough rows I can run my UDF with no problems. However, I want to run with my original dataset and even if I run this spark job on a machine with 192.0 GiB of RAM I still get the same error. (And 192.0 GiB should be enough to hold the whole dataset in memory.)
How can I give spark enough memory to be able to run grouped aggregate Pandas UDFs that requires a lot of memory?
For example, is there some spark configuration I'm missing out on that gives more memory to apache arrow?
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
in
----> 1 device_attack_result.count()
2
3
4
/usr/lib/spark/python/pyspark/sql/dataframe.py in count(self)
520 2
521 """
--> 522 return int(self._jdf.count())
523
524 @ignore_unicode_prefix
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o818.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 102 in stage 27.0 failed 4 times, most recent failure: Lost task 102.3 in stage 27.0 (TID 3235, ip-172-31-111-163.ec2.internal, executor 1): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
...
Full error message here.
Spark's PandasUDF functionality uses Arrow framework to convert the spark DataFrame to pandas DataFrame and The Arrow internal buffer limit is only 2GB at this point, so your pandasUDF group by condition should not produce more than 2 GB of data uncompressed.
df.groupby('id').apply(function)
I mean
you can run your pandas UDF method only if your group by partition size is less than 2 GB uncompressed
here is the ticket for your reference
https://issues.apache.org/jira/browse/ARROW-4890
above issue seems resolved in >= 0.15 version of pyarrow and only Spark 3.x uses pyarrow 0.15 version
Arrow 0.16 has changed max buffer allocation size form MaxInteger to MaxLong (64 bits) https://issues.apache.org/jira/browse/ARROW-6112
As of July 2020 upstream Spark is still based on Arrow 0.15 https://github.com/apache/spark/blob/master/python/setup.py
Netty backing buffers still don't support this though.. so chances are you will still hit that issue as a different exception.
So as of now this is still not possible due to the above restrictions.
This might get fixed in Spark side https://issues.apache.org/jira/browse/SPARK-32294 The idea is to feed GroupedData into a pandas UDF in batches to solve this issue.
Update: PySpark on Databricks platform doesn't have this issue. Requires DBR7.4+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With