Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does foldLeft in Scala work on DataFrame?

Tags:

scala

I had a requirement to ingest an RDBMS table into Hive and I had to clean the data in its String columns, before inserting it into a Hive table using a regex_replace pattern. After failing to understand how to apply it on my dataFrame, I finally I came across a method in Scala which is foldLeft which helped in fulfilling the requirement.

I understand how foldLeft works on a collection, for example:

List(1,3,9).foldLeft(100)((x,y) => x+y)

foldLeft takes arguments: initialValue and a function. It adds the result of the function to the accumulator. In the above case, the result is: 113.

But when it comes to dataframe, I am unable to understand how it works.

val stringColumns = yearDF.schema.fields.filter(_.dataType == StringType).map(_.name)
val finalDF = stringColumns.foldLeft(yearDF){ (tempdf, colName) => tempdf.withColumn(colName, regexp_replace(col(colName), "\n", "")) }

In the above code, I got the String columns from the dataFrame: yearDF which is kept in the accumulator of the foldLeft. I have the following doubts regarding the function used in foldLeft:

  1. What value does tempDF hold ? If it is the same as yearDF, how is it mapped to yearDF ?
  2. If withColumns is used in the function and the result is added to yearDF, how come it is not creating duplicating columns when

Could anyone explain it, so that I can have a better understanding about foldLeft.

like image 352
Metadata Avatar asked Aug 26 '18 10:08

Metadata


1 Answers

Consider a trivialized foldLeft example more similar to your DataFrame version:

List(3, 2, 1).foldLeft("abcde")((acc, x) => acc.take(x))

If you look closely at what the (acc, x) => acc.take(x) function does in each iteration, the foldLeft is no difference from the following:

"abcde".take(3).take(2).take(1)
// Result: "a"

Going back to the foldLeft for your DataFrame:

stringColumns.foldLeft(yearDF){ (tempdf, colName) =>
  tempdf.withColumn(colName, regexp_replace(col(colName), "\n", ""))
}

Similarly it's no difference from:

val sz = stringColumns.size

yearDF.
  withColumn(stringColumns(0), regexp_replace(col(stringColumns(0)), "\n", "")).
  withColumn(stringColumns(1), regexp_replace(col(stringColumns(1)), "\n", "")).
  ...
  withColumn(stringColumns(sz - 1), regexp_replace(col(stringColumns(sz - 1)), "\n", ""))
  1. What value does tempDF hold ? If it is the same as yearDF, how is it mapped to yearDF ?

In each iteration (i = 0, 1, 2, ...), tempDF holds a new DataFrame transformed from applying withColumn(stringColumns(i), ...), starting from yearDF

  1. If withColumns is used in the function and the result is added to yearDF, how come it is not creating duplicating columns when

From withColumn(stringColumns(i), regexp_replace(col(stringColumns(i)), "\n", "")), method withColumn creates a new DataFrame, "adding" a column with the same name as the column stringColumns(i) it derives from, thus essentially resulting in a new DataFrame with the same column list as the original yearDF.

like image 176
Leo C Avatar answered Oct 22 '22 00:10

Leo C