Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark window function on dataframe with large number of columns

I have an ML dataframe which I read from csv files. It contains three types of columns:

ID Timestamp Feature1 Feature2...Feature_n

where n is ~ 500 (500 features in ML parlance). The total number of rows in the dataset is ~ 160 millions.

As this is the result of a previous full join, there are many features which do not have values set.

My aim is to run a "fill" function(fillna style form python pandas), where each empty feature value gets set with the previously available value for that column, per Id and Date.

I am trying to achieve this with the following spark 2.2.1 code:

 val rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)

 val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(-50000, -1)

 val columns = Array(...) //first 30 columns initially, just to see it working

val rawDataSetFilled = columns.foldLeft(rawDataset) { (originalDF, columnToFill) =>
      originalDF.withColumn(columnToFill, coalesce(col(columnToFill), last(col(columnToFill), ignoreNulls = true).over(window)))
    }

I am running this job on a 4 m4.large instances on Amazon EMR, with spark 2.2.1. and dynamic allocation enabled.

The job runs for over 2h without completing.

Am I doing something wrong, at the code level? Given the size of the data, and the instances, I would assume it should finish in a reasonable amount of time? And I haven't even tried with the full 500 columns, just with about 30!

Looking in the container logs, all I see are many logs like this:

INFO codegen.CodeGenerator: Code generated in 166.677493 ms

INFO execution.ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

I have tried setting parameter spark.sql.windowExec.buffer.spill.threshold to something larger, without any impact. Is theresome other setting I should know about? Those 2 lines are the only ones I see in any container log.

In Ganglia, I see most of the CPU cores peaking around full usage, but the memory usage is lower than the maximum available. All executors are allocated and are doing work.

like image 569
cristi.calugaru Avatar asked Feb 19 '18 16:02

cristi.calugaru


People also ask

How can I show more than 20 rows in Spark?

By default Spark with Scala, Java, or with Python (PySpark), fetches only 20 rows from DataFrame show() but not all rows and the column value is truncated to 20 characters, In order to fetch/display more than 20 rows and column full value from Spark/PySpark DataFrame, you need to pass arguments to the show() method.

How does window function work in Spark?

Window functions allow users of Spark SQL to calculate results such as the rank of a given row or a moving average over a range of input rows. They significantly improve the expressiveness of Spark's SQL and DataFrame APIs.

What is window aggregation?

You use window aggregates (also called window analytic functions) to compute an aggregate value that is based on a group of rows, which are defined by a window. The window determines the range of rows the system uses to conduct the calculations.


1 Answers

I have managed to rewrite the fold left logic without using withColumn calls. Apparently they can be very slow for large number of columns, and I was also getting stackoverflow errors because of that.

I would be curious to know why this massive difference - and what exactly happens behind the scenes with the query plan execution, which makes repeated withColumns calls so slow.

Links which proved very helpful: Spark Jira issue and this stackoverflow question

    var rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)    
    val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    rawDataset = rawDataset.select(rawDataset.columns.map(column => coalesce(col(column), last(col(column), ignoreNulls = true).over(window)).alias(column)): _*)
    rawDataset.write.option("header", "true").csv(outputLocation)
like image 142
cristi.calugaru Avatar answered Oct 12 '22 12:10

cristi.calugaru