Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stack Overflow while processing several columns with a UDF

I have a DataFrame with many columns of str type, and I want to apply a function to all those columns, without renaming their names or adding more columns, I tried using a for-in loop executing withColumn (see example bellow), but normally when I run the code, it shows a Stack Overflow (it rarely works), this DataFrame is not big at all, it has just ~15000 records.

# df is a DataFrame
def lowerCase(string):
    return string.strip().lower()

lowerCaseUDF = udf(lowerCase, StringType())

for (columnName, kind) in df.dtypes:
    if(kind == "string"):
        df = df.withColumn(columnName, lowerCaseUDF(df[columnName]))

df.select("Tipo_unidad").distinct().show()

The complete error is very long, therefore I decided to paste only some lines. But you can find the full trace here Complete Trace

Py4JJavaError: An error occurred while calling o516.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 4 times, most recent failure: Lost task 1.3 in stage 2.0 (TID 38, worker2.mcbo.mood.com.ve): java.lang.StackOverflowError at java.io.ObjectInputStream$BlockDataInputStream.readByte(ObjectInputStream.java:2774)

I am thinking that this problem is produced because this code launches many jobs (one for each column of type string), could you show me another alternative or what I am doing wrong?

like image 790
Alberto Bonsanto Avatar asked Jan 28 '16 16:01

Alberto Bonsanto


People also ask

Can UDF return multiple columns?

UDF can return only a single column at the time.

How do I apply a function to multiple columns in PySpark?

You can use reduce , for loops, or list comprehensions to apply PySpark functions to multiple columns in a DataFrame. Using iterators to apply the same operation on multiple columns is vital for maintaining a DRY codebase.

Can PySpark UDF return DataFrame?

Conclusion. PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType.

How do you add multiple columns in PySpark?

Add Multiple Columns using Map You can add multiple columns to Spark DataFrame in several ways if you wanted to add a known set of columns you can easily do by chaining withColumn() or on select().


1 Answers

Try something like this:

from pyspark.sql.functions import col, lower, trim

exprs = [
    lower(trim(col(c))).alias(c) if t == "string" else col(c) 
    for (c, t) in df.dtypes
]

df.select(*exprs)

This approach has two main advantages over you current solution:

  • it requires only as single projection (no growing lineage which most likely responsible for SO) instead of projection per string column.
  • it operates directly only an internal representation without passing data to Python (BatchPythonProcessing).
like image 105
zero323 Avatar answered Sep 28 '22 02:09

zero323