I have a Spark Dataframe with some missing values. I would like to perform a simple imputation by replacing the missing values with the mean for that column. I am very new to Spark, so I have been struggling to implement this logic. This is what I have managed to do so far:
a) To do this for a single column (let's say Col A), this line of code seems to work:
df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
.first()(0).asInstanceOf[Double])
.otherwise($"ColA"))
b) However, I have not been able to figure out, how to do this for all the columns in my dataframe. I was trying out the Map function, but I believe it loops through each row of a dataframe
c) There is a similar question on SO - here. And while I liked the solution (using Aggregated tables and coalesce), I was very keen to know if there is a way to do this by looping through each column (I come from R, so looping through each column using a higher order functional like lapply seems more natural to me).
Thanks!
In PySpark, DataFrame. fillna() or DataFrameNaFunctions. fill() is used to replace NULL/None values on all or selected multiple DataFrame columns with either zero(0), empty string, space, or any constant literal values.
In Spark, fill() function of DataFrameNaFunctions class is used to replace NULL values on the DataFrame column with either with zero(0), empty string, space, or any constant literal values.
You can use 'fillna' method for imputing the column 'Loan_Amount_Term' with the median value. In some cases, imputing the values with the previous value instead of mean, mode or median is more appropriate. This is called forward fill. It is mostly used in time series data.
Spark >= 2.2
You can use org.apache.spark.ml.feature.Imputer
(which supports both mean and median strategy).
Scala :
import org.apache.spark.ml.feature.Imputer
val imputer = new Imputer()
.setInputCols(df.columns)
.setOutputCols(df.columns.map(c => s"${c}_imputed"))
.setStrategy("mean")
imputer.fit(df).transform(df)
Python:
from pyspark.ml.feature import Imputer
imputer = Imputer(
inputCols=df.columns,
outputCols=["{}_imputed".format(c) for c in df.columns]
)
imputer.fit(df).transform(df)
Spark < 2.2
Here you are:
import org.apache.spark.sql.functions.mean
df.na.fill(df.columns.zip(
df.select(df.columns.map(mean(_)): _*).first.toSeq
).toMap)
where
df.columns.map(mean(_)): Array[Column]
computes an average for each column,
df.select(_: *).first.toSeq: Seq[Any]
collects aggregated values and converts row to Seq[Any]
(I know it is suboptimal but this is the API we have to work with),
df.columns.zip(_).toMap: Map[String,Any]
creates aMap: Map[String, Any]
which maps from the column name to its average, and finally:
df.na.fill(_): DataFrame
fills the missing values using:
fill: Map[String, Any] => DataFrame
from DataFrameNaFunctions
.
To ingore NaN
entries you can replace:
df.select(df.columns.map(mean(_)): _*).first.toSeq
with:
import org.apache.spark.sql.functions.{col, isnan, when}
df.select(df.columns.map(
c => mean(when(!isnan(col(c)), col(c)))
): _*).first.toSeq
For PySpark, this is the code I used:
mean_dict = { col: 'mean' for col in df.columns }
col_avgs = df.agg( mean_dict ).collect()[0].asDict()
col_avgs = { k[4:-1]: v for k,v in col_avgs.iteritems() }
df.fillna( col_avgs ).show()
The four steps are:
mean_dict
mapping column names to the aggregate operation (mean)col_avgs
col_avgs
start with avg(
and end with )
, e.g. avg(col1)
. Strip the parentheses out.col_avgs
For imputing the median (instead of the mean) in PySpark < 2.2
## filter numeric cols
num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int"}, df.dtypes)]
### Compute a dict with <col_name, median_value>
median_dict = dict()
for c in num_cols:
median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0]
Then, apply na.fill
df_imputed = df.na.fill(median_dict)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With