I'm using a UDF written in python in order to change the base of a number.
So I read a parquet file and write to a parquet file and applying the UDF. Here is the line I run:
input_df.withColumn("origin_base", convert_2_dest_base(input_df.origin_base)).write.mode('overwrite').parquet(destination_path)
This conversion makes spark to utilize a lot of memory and I get this kind of warnings:
17/06/18 08:05:39 WARN TaskSetManager: Lost task 40.0 in stage 4.0 (TID 183, ip-10-100-5-196.ec2.internal, executor 19): ExecutorLostFailure (executor 19 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 4.4 GB of 4.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
and in the end it fails.
Does UDF is not the right approach? Why is it consuming so much memory?
Introduction. It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .
The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.
User Defined Functions is an important feature of Spark SQL which helps extend the language by adding custom constructs. UDFs are very useful for extending spark vocabulary but come with significant performance overhead.
A pandas user-defined function (UDF)—also known as vectorized UDF—is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data. pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.
For pyspark, data is processed in Python and cached / shuffled in the JVM. If you are using built in Python API, there would not be much difference in terms of performance to scala. See python vs scala performance
When you use udf, since your local defined function does not registered in native JVM structure and so can't be implemented by simple java API call, it has to be serialize/deserialize to Python worker. Then data will be processed in Python worker and serialize/deserialize back to JVM.
The Python worker now need to process the serialized data in the off-heap memory, it consumes huge off-heap memory and so it often leads to memoryOverhead.
Performance wise, serialization is slow and it is often the key for performance tuning.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With