Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Replace missing values with mean - Spark Dataframe

I have a Spark Dataframe with some missing values. I would like to perform a simple imputation by replacing the missing values with the mean for that column. I am very new to Spark, so I have been struggling to implement this logic. This is what I have managed to do so far:

a) To do this for a single column (let's say Col A), this line of code seems to work:

df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
  .first()(0).asInstanceOf[Double])
  .otherwise($"ColA"))

b) However, I have not been able to figure out, how to do this for all the columns in my dataframe. I was trying out the Map function, but I believe it loops through each row of a dataframe

c) There is a similar question on SO - here. And while I liked the solution (using Aggregated tables and coalesce), I was very keen to know if there is a way to do this by looping through each column (I come from R, so looping through each column using a higher order functional like lapply seems more natural to me).

Thanks!

like image 269
Dataminer Avatar asked Oct 15 '16 09:10

Dataminer


People also ask

How do you replace missing values in PySpark?

In PySpark, DataFrame. fillna() or DataFrameNaFunctions. fill() is used to replace NULL/None values on all or selected multiple DataFrame columns with either zero(0), empty string, space, or any constant literal values.

How do I change the NULL value in Spark DataFrame?

In Spark, fill() function of DataFrameNaFunctions class is used to replace NULL values on the DataFrame column with either with zero(0), empty string, space, or any constant literal values.

How do you fill NULL or missing values of the dataset other than mean median mode?

You can use 'fillna' method for imputing the column 'Loan_Amount_Term' with the median value. In some cases, imputing the values with the previous value instead of mean, mode or median is more appropriate. This is called forward fill. It is mostly used in time series data.


3 Answers

Spark >= 2.2

You can use org.apache.spark.ml.feature.Imputer (which supports both mean and median strategy).

Scala :

import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer()
  .setInputCols(df.columns)
  .setOutputCols(df.columns.map(c => s"${c}_imputed"))
  .setStrategy("mean")

imputer.fit(df).transform(df)

Python:

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=df.columns, 
    outputCols=["{}_imputed".format(c) for c in df.columns]
)
imputer.fit(df).transform(df)

Spark < 2.2

Here you are:

import org.apache.spark.sql.functions.mean

df.na.fill(df.columns.zip(
  df.select(df.columns.map(mean(_)): _*).first.toSeq
).toMap)

where

df.columns.map(mean(_)): Array[Column] 

computes an average for each column,

df.select(_: *).first.toSeq: Seq[Any]

collects aggregated values and converts row to Seq[Any] (I know it is suboptimal but this is the API we have to work with),

df.columns.zip(_).toMap: Map[String,Any] 

creates aMap: Map[String, Any] which maps from the column name to its average, and finally:

df.na.fill(_): DataFrame

fills the missing values using:

fill: Map[String, Any] => DataFrame 

from DataFrameNaFunctions.

To ingore NaN entries you can replace:

df.select(df.columns.map(mean(_)): _*).first.toSeq

with:

import org.apache.spark.sql.functions.{col, isnan, when}


df.select(df.columns.map(
  c => mean(when(!isnan(col(c)), col(c)))
): _*).first.toSeq
like image 197
zero323 Avatar answered Sep 23 '22 20:09

zero323


For PySpark, this is the code I used:

mean_dict = { col: 'mean' for col in df.columns }
col_avgs = df.agg( mean_dict ).collect()[0].asDict()
col_avgs = { k[4:-1]: v for k,v in col_avgs.iteritems() }
df.fillna( col_avgs ).show()

The four steps are:

  1. Create the dictionary mean_dict mapping column names to the aggregate operation (mean)
  2. Calculate the mean for each column, and save it as the dictionary col_avgs
  3. The column names in col_avgs start with avg( and end with ), e.g. avg(col1). Strip the parentheses out.
  4. Fill the columns of the dataframe with the averages using col_avgs
like image 43
Michael P Avatar answered Sep 24 '22 20:09

Michael P


For imputing the median (instead of the mean) in PySpark < 2.2

## filter numeric cols
num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int"}, df.dtypes)]
### Compute a dict with <col_name, median_value>
median_dict = dict()
for c in num_cols:
   median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0]

Then, apply na.fill

df_imputed = df.na.fill(median_dict)
like image 42
noleto Avatar answered Sep 24 '22 20:09

noleto