Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Replacing null values with 0 after spark dataframe left outer join

I have two dataframes called left and right.

scala> left.printSchema
root
|-- user_uid: double (nullable = true)
|-- labelVal: double (nullable = true)
|-- probability_score: double (nullable = true)

scala> right.printSchema
root
|-- user_uid: double (nullable = false)
|-- real_labelVal: double (nullable = false)

Then, I join them to get the joined Dataframe. It is a left outer join. Anyone interested in the natjoin function can find it here.

https://gist.github.com/anonymous/f02bd79528ac75f57ae8

scala> val joinedData = natjoin(predictionDataFrame, labeledObservedDataFrame, "left_outer")

scala> joinedData.printSchema
|-- user_uid: double (nullable = true)
|-- labelVal: double (nullable = true)
|-- probability_score: double (nullable = true)
|-- real_labelVal: double (nullable = false)

Since it is a left outer join, the real_labelVal column has nulls when user_uid is not present in right.

scala> val realLabelVal = joinedData.select("real_labelval").distinct.collect
realLabelVal: Array[org.apache.spark.sql.Row] = Array([0.0], [null])

I want to replace the null values in the realLabelVal column with 1.0.

Currently I do the following:

  1. I find the index of real_labelval column and use the spark.sql.Row API to set the nulls to 1.0. (This gives me a RDD[Row])
  2. Then I apply the schema of the joined dataframe to get the cleaned dataframe.

The code is as follows:

 val real_labelval_index = 3
 def replaceNull(row: Row) = {
    val rowArray = row.toSeq.toArray
     rowArray(real_labelval_index) = 1.0
     Row.fromSeq(rowArray)
 }

 val cleanRowRDD = joinedData.map(row => if (row.isNullAt(real_labelval_index)) replaceNull(row) else row)
 val cleanJoined = sqlContext.createDataFrame(cleanRowRdd, joinedData.schema)

Is there an elegant or efficient way to do this?

Goolging hasn't helped much. Thanks in advance.

like image 274
Mihir Shinde Avatar asked Aug 04 '15 01:08

Mihir Shinde


People also ask

How do you replace NULL values with 0 in PySpark DataFrame?

In PySpark, DataFrame. fillna() or DataFrameNaFunctions. fill() is used to replace NULL/None values on all or selected multiple DataFrame columns with either zero(0), empty string, space, or any constant literal values.

How do I change the null value in Spark?

In Spark, fill() function of DataFrameNaFunctions class is used to replace NULL values on the DataFrame column with either with zero(0), empty string, space, or any constant literal values.

What function would you use to remove NULL values when using a left join?

You can use ISNULL(MAX(T2. LOGIN_TIME), 'Default Value') to replace the NULL . But first you should consider that LOGIN_TIME varchar(255) with values such as 17.07.

How do I get rid of NULL in Spark?

In order to remove Rows with NULL values on selected columns of PySpark DataFrame, use drop(columns:Seq[String]) or drop(columns:Array[String]). To these functions pass the names of the columns you wanted to check for NULL values to delete rows.


1 Answers

Have you tried using na

joinedData.na.fill(1.0, Seq("real_labelval"))
like image 164
Justin Pihony Avatar answered Oct 22 '22 21:10

Justin Pihony