Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark udf high memory utilization

I'm using a UDF written in python in order to change the base of a number.

So I read a parquet file and write to a parquet file and applying the UDF. Here is the line I run:

input_df.withColumn("origin_base", convert_2_dest_base(input_df.origin_base)).write.mode('overwrite').parquet(destination_path)

This conversion makes spark to utilize a lot of memory and I get this kind of warnings:

17/06/18 08:05:39 WARN TaskSetManager: Lost task 40.0 in stage 4.0 (TID 183, ip-10-100-5-196.ec2.internal, executor 19): ExecutorLostFailure (executor 19 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 4.4 GB of 4.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

and in the end it fails.

Does UDF is not the right approach? Why is it consuming so much memory?

like image 312
Gluz Avatar asked Jun 18 '17 08:06

Gluz


People also ask

Why UDF is not recommended in Spark?

Introduction. It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .

Why are UDFs slow in PySpark?

The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.

Is UDF good in Spark?

User Defined Functions is an important feature of Spark SQL which helps extend the language by adding custom constructs. UDFs are very useful for extending spark vocabulary but come with significant performance overhead.

Is Panda UDF faster?

A pandas user-defined function (UDF)—also known as vectorized UDF—is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data. pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.


1 Answers

For pyspark, data is processed in Python and cached / shuffled in the JVM. If you are using built in Python API, there would not be much difference in terms of performance to scala. See python vs scala performance

enter image description here

When you use udf, since your local defined function does not registered in native JVM structure and so can't be implemented by simple java API call, it has to be serialize/deserialize to Python worker. Then data will be processed in Python worker and serialize/deserialize back to JVM.

The Python worker now need to process the serialized data in the off-heap memory, it consumes huge off-heap memory and so it often leads to memoryOverhead.

Performance wise, serialization is slow and it is often the key for performance tuning.

like image 146
Wong Tat Yau Avatar answered Oct 19 '22 21:10

Wong Tat Yau