Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to solve pyspark `org.apache.arrow.vector.util.OversizedAllocationException` error by increasing spark's memory?

I'm running a job in pyspark where I at one point use a grouped aggregate Pandas UDF. This results in the following (here abbreviate) error:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer

I'm fairly sure this is because one of the groups the pandas UDF receives is huge, and if I reduce the dataset and removes enough rows I can run my UDF with no problems. However, I want to run with my original dataset and even if I run this spark job on a machine with 192.0 GiB of RAM I still get the same error. (And 192.0 GiB should be enough to hold the whole dataset in memory.)

How can I give spark enough memory to be able to run grouped aggregate Pandas UDFs that requires a lot of memory?

For example, is there some spark configuration I'm missing out on that gives more memory to apache arrow?

Longer error message

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
 in 
----> 1 device_attack_result.count()
      2 
      3 
      4 

/usr/lib/spark/python/pyspark/sql/dataframe.py in count(self)
    520         2
    521         """
--> 522         return int(self._jdf.count())
    523 
    524     @ignore_unicode_prefix

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o818.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 102 in stage 27.0 failed 4 times, most recent failure: Lost task 102.3 in stage 27.0 (TID 3235, ip-172-31-111-163.ec2.internal, executor 1): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
...

Full error message here.

like image 873
Rasmus Bååth Avatar asked Dec 03 '22 10:12

Rasmus Bååth


2 Answers

Spark's PandasUDF functionality uses Arrow framework to convert the spark DataFrame to pandas DataFrame and The Arrow internal buffer limit is only 2GB at this point, so your pandasUDF group by condition should not produce more than 2 GB of data uncompressed.

df.groupby('id').apply(function)

I mean

you can run your pandas UDF method only if your group by partition size is less than 2 GB uncompressed

here is the ticket for your reference

https://issues.apache.org/jira/browse/ARROW-4890

above issue seems resolved in >= 0.15 version of pyarrow and only Spark 3.x uses pyarrow 0.15 version

like image 103
Suresh Avatar answered Dec 21 '22 07:12

Suresh


Arrow 0.16 has changed max buffer allocation size form MaxInteger to MaxLong (64 bits) https://issues.apache.org/jira/browse/ARROW-6112

As of July 2020 upstream Spark is still based on Arrow 0.15 https://github.com/apache/spark/blob/master/python/setup.py

Netty backing buffers still don't support this though.. so chances are you will still hit that issue as a different exception.

So as of now this is still not possible due to the above restrictions.

This might get fixed in Spark side https://issues.apache.org/jira/browse/SPARK-32294 The idea is to feed GroupedData into a pandas UDF in batches to solve this issue.

Update: PySpark on Databricks platform doesn't have this issue. Requires DBR7.4+

like image 45
Tagar Avatar answered Dec 21 '22 08:12

Tagar