Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to filter out a null value from spark dataframe

I created a dataframe in spark with the following schema:

root
 |-- user_id: long (nullable = false)
 |-- event_id: long (nullable = false)
 |-- invited: integer (nullable = false)
 |-- day_diff: long (nullable = true)
 |-- interested: integer (nullable = false)
 |-- event_owner: long (nullable = false)
 |-- friend_id: long (nullable = false)

And the data is shown below:

+----------+----------+-------+--------+----------+-----------+---------+
|   user_id|  event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
|   4236494| 110357109|      0|      -1|         0|  937597069|     null|
|  78065188| 498404626|      0|       0|         0| 2904922087|     null|
| 282487230|2520855981|      0|      28|         0| 3749735525|     null|
| 335269852|1641491432|      0|       2|         0| 1490350911|     null|
| 437050836|1238456614|      0|       2|         0|  991277599|     null|
| 447244169|2095085551|      0|      -1|         0| 1579858878|     null|
| 516353916|1076364848|      0|       3|         1| 3597645735|     null|
| 528218683|1151525474|      0|       1|         0| 3433080956|     null|
| 531967718|3632072502|      0|       1|         0| 3863085861|     null|
| 627948360|2823119321|      0|       0|         0| 4092665803|     null|
| 811791433|3513954032|      0|       2|         0|  415464198|     null|
| 830686203|  99027353|      0|       0|         0| 3549822604|     null|
|1008893291|1115453150|      0|       2|         0| 2245155244|     null|
|1239364869|2824096896|      0|       2|         1| 2579294650|     null|
|1287950172|1076364848|      0|       0|         0| 3597645735|     null|
|1345896548|2658555390|      0|       1|         0| 2025118823|     null|
|1354205322|2564682277|      0|       3|         0| 2563033185|     null|
|1408344828|1255629030|      0|      -1|         1|  804901063|     null|
|1452633375|1334001859|      0|       4|         0| 1488588320|     null|
|1625052108|3297535757|      0|       3|         0| 1972598895|     null|
+----------+----------+-------+--------+----------+-----------+---------+

I want to filter out the rows have null values in the field of "friend_id".

scala> val aaa = test.filter("friend_id is null")

scala> aaa.count

I got :res52: Long = 0 which is obvious not right. What is the right way to get it?

One more question, I want to replace the values in the friend_id field. I want to replace null with 0 and 1 for any other value except null. The code I can figure out is:

val aaa = train_friend_join.select($"user_id", $"event_id", $"invited", $"day_diff", $"interested", $"event_owner", ($"friend_id" != null)?1:0)

This code also doesn't work. Can anyone tell me how can I fix it? Thanks

like image 302
Steven Li Avatar asked Sep 27 '16 14:09

Steven Li


People also ask

How do I filter NULL values in Spark DataFrame?

In Spark, using filter() or where() functions of DataFrame we can filter rows with NULL values by checking IS NULL or isNULL . These removes all rows with null values on state column and returns the new DataFrame. All above examples returns the same output.

How do I remove a NULL value from Spark?

In order to remove Rows with NULL values on selected columns of PySpark DataFrame, use drop(columns:Seq[String]) or drop(columns:Array[String]). To these functions pass the names of the columns you wanted to check for NULL values to delete rows.

How do you filter blanks in PySpark?

For filtering the NULL/None values we have the function in PySpark API know as a filter() and with this function, we are using isNotNull() function. Syntax: df. filter(condition) : This function returns the new dataframe with the values which satisfies the given condition.

How do you check for NULL values in PySpark DataFrame?

In PySpark DataFrame you can calculate the count of Null, None, NaN & Empty/Blank values in a column by using isNull() of Column class & SQL functions isnan() count() and when().


11 Answers

Let's say you have this data setup (so that results are reproducible):

// declaring data types
case class Company(cName: String, cId: String, details: String)
case class Employee(name: String, id: String, email: String, company: Company)

// setting up example data
val e1 = Employee("n1", null, "[email protected]", Company("c1", "1", "d1"))
val e2 = Employee("n2", "2", "[email protected]", Company("c1", "1", "d1"))
val e3 = Employee("n3", "3", "[email protected]", Company("c1", "1", "d1"))
val e4 = Employee("n4", "4", "[email protected]", Company("c2", "2", "d2"))
val e5 = Employee("n5", null, "[email protected]", Company("c2", "2", "d2"))
val e6 = Employee("n6", "6", "[email protected]", Company("c2", "2", "d2"))
val e7 = Employee("n7", "7", "[email protected]", Company("c3", "3", "d3"))
val e8 = Employee("n8", "8", "[email protected]", Company("c3", "3", "d3"))
val employees = Seq(e1, e2, e3, e4, e5, e6, e7, e8)
val df = sc.parallelize(employees).toDF

Data is:

+----+----+---------+---------+
|name|  id|    email|  company|
+----+----+---------+---------+
|  n1|null|[email protected]|[c1,1,d1]|
|  n2|   2|[email protected]|[c1,1,d1]|
|  n3|   3|[email protected]|[c1,1,d1]|
|  n4|   4|[email protected]|[c2,2,d2]|
|  n5|null|[email protected]|[c2,2,d2]|
|  n6|   6|[email protected]|[c2,2,d2]|
|  n7|   7|[email protected]|[c3,3,d3]|
|  n8|   8|[email protected]|[c3,3,d3]|
+----+----+---------+---------+

Now to filter employees with null ids, you will do --

df.filter("id is null").show

which will correctly show you following:

+----+----+---------+---------+
|name|  id|    email|  company|
+----+----+---------+---------+
|  n1|null|[email protected]|[c1,1,d1]|
|  n5|null|[email protected]|[c2,2,d2]|
+----+----+---------+---------+

Coming to the second part of your question, you can replace the null ids with 0 and other values with 1 with this --

df.withColumn("id", when($"id".isNull, 0).otherwise(1)).show

This results in:

+----+---+---------+---------+
|name| id|    email|  company|
+----+---+---------+---------+
|  n1|  0|[email protected]|[c1,1,d1]|
|  n2|  1|[email protected]|[c1,1,d1]|
|  n3|  1|[email protected]|[c1,1,d1]|
|  n4|  1|[email protected]|[c2,2,d2]|
|  n5|  0|[email protected]|[c2,2,d2]|
|  n6|  1|[email protected]|[c2,2,d2]|
|  n7|  1|[email protected]|[c3,3,d3]|
|  n8|  1|[email protected]|[c3,3,d3]|
+----+---+---------+---------+
like image 188
Sachin Tyagi Avatar answered Sep 30 '22 05:09

Sachin Tyagi


Or like df.filter($"friend_id".isNotNull)

like image 34
Adriana Lazar Avatar answered Sep 30 '22 06:09

Adriana Lazar


df.where(df.col("friend_id").isNull)
like image 40
Michael Kopaniov Avatar answered Sep 30 '22 06:09

Michael Kopaniov


There are two ways to do it: creating filter condition 1) Manually 2) Dynamically.

Sample DataFrame:

val df = spark.createDataFrame(Seq(
  (0, "a1", "b1", "c1", "d1"),
  (1, "a2", "b2", "c2", "d2"),
  (2, "a3", "b3", null, "d3"),
  (3, "a4", null, "c4", "d4"),
  (4, null, "b5", "c5", "d5")
)).toDF("id", "col1", "col2", "col3", "col4")

+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
|  0|  a1|  b1|  c1|  d1|
|  1|  a2|  b2|  c2|  d2|
|  2|  a3|  b3|null|  d3|
|  3|  a4|null|  c4|  d4|
|  4|null|  b5|  c5|  d5|
+---+----+----+----+----+

1) Creating filter condition manually i.e. using DataFrame where or filter function

df.filter(col("col1").isNotNull && col("col2").isNotNull).show

or

df.where("col1 is not null and col2 is not null").show

Result:

+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
|  0|  a1|  b1|  c1|  d1|
|  1|  a2|  b2|  c2|  d2|
|  2|  a3|  b3|null|  d3|
+---+----+----+----+----+

2) Creating filter condition dynamically: This is useful when we don't want any column to have null value and there are large number of columns, which is mostly the case.

To create the filter condition manually in these cases will waste a lot of time. In below code we are including all columns dynamically using map and reduce function on DataFrame columns:

val filterCond = df.columns.map(x=>col(x).isNotNull).reduce(_ && _)

How filterCond looks:

filterCond: org.apache.spark.sql.Column = (((((id IS NOT NULL) AND (col1 IS NOT NULL)) AND (col2 IS NOT NULL)) AND (col3 IS NOT NULL)) AND (col4 IS NOT NULL))

Filtering:

val filteredDf = df.filter(filterCond)

Result:

+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
|  0|  a1|  b1|  c1|  d1|
|  1|  a2|  b2|  c2|  d2|
+---+----+----+----+----+
like image 26
Ayush Vatsyayan Avatar answered Sep 30 '22 06:09

Ayush Vatsyayan


A good solution for me was to drop the rows with any null values:

Dataset<Row> filtered = df.filter(row => !row.anyNull);

In case one is interested in the other case, just call row.anyNull. (Spark 2.1.0 using Java API)

like image 20
chAlexey Avatar answered Sep 30 '22 05:09

chAlexey


The following lines work well:

test.filter("friend_id is not null")
like image 37
KayV Avatar answered Sep 30 '22 06:09

KayV


From the hint from Michael Kopaniov, below works

df.where(df("id").isNotNull).show
like image 40
Robin Wang Avatar answered Sep 30 '22 05:09

Robin Wang


Here is a solution for spark in Java. To select data rows containing nulls. When you have Dataset data, you do:

Dataset<Row> containingNulls =  data.where(data.col("COLUMN_NAME").isNull())

To filter out data without nulls you do:

Dataset<Row> withoutNulls = data.where(data.col("COLUMN_NAME").isNotNull())

Often dataframes contain columns of type String where instead of nulls we have empty strings like "". To filter out such data as well we do:

Dataset<Row> withoutNullsAndEmpty = data.where(data.col("COLUMN_NAME").isNotNull().and(data.col("COLUMN_NAME").notEqual("")))
like image 33
Andrushenko Alexander Avatar answered Sep 30 '22 06:09

Andrushenko Alexander


for the first question, it is correct you are filtering out nulls and hence count is zero.

for the second replacing: use like below:

val options = Map("path" -> "...\\ex.csv", "header" -> "true")
val dfNull = spark.sqlContext.load("com.databricks.spark.csv", options)

scala> dfNull.show

+----------+----------+-------+--------+----------+-----------+---------+
|   user_id|  event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
|   4236494| 110357109|      0|      -1|         0|  937597069|     null|
|  78065188| 498404626|      0|       0|         0| 2904922087|     null|
| 282487230|2520855981|      0|      28|         0| 3749735525|     null|
| 335269852|1641491432|      0|       2|         0| 1490350911|     null|
| 437050836|1238456614|      0|       2|         0|  991277599|     null|
| 447244169|2095085551|      0|      -1|         0| 1579858878|        a|
| 516353916|1076364848|      0|       3|         1| 3597645735|        b|
| 528218683|1151525474|      0|       1|         0| 3433080956|        c|
| 531967718|3632072502|      0|       1|         0| 3863085861|     null|
| 627948360|2823119321|      0|       0|         0| 4092665803|     null|
| 811791433|3513954032|      0|       2|         0|  415464198|     null|
| 830686203|  99027353|      0|       0|         0| 3549822604|     null|
|1008893291|1115453150|      0|       2|         0| 2245155244|     null|
|1239364869|2824096896|      0|       2|         1| 2579294650|        d|
|1287950172|1076364848|      0|       0|         0| 3597645735|     null|
|1345896548|2658555390|      0|       1|         0| 2025118823|     null|
|1354205322|2564682277|      0|       3|         0| 2563033185|     null|
|1408344828|1255629030|      0|      -1|         1|  804901063|     null|
|1452633375|1334001859|      0|       4|         0| 1488588320|     null|
|1625052108|3297535757|      0|       3|         0| 1972598895|     null|
+----------+----------+-------+--------+----------+-----------+---------+

dfNull.withColumn("friend_idTmp", when($"friend_id".isNull, "1").otherwise("0")).drop($"friend_id").withColumnRenamed("friend_idTmp", "friend_id").show

+----------+----------+-------+--------+----------+-----------+---------+
|   user_id|  event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
|   4236494| 110357109|      0|      -1|         0|  937597069|        1|
|  78065188| 498404626|      0|       0|         0| 2904922087|        1|
| 282487230|2520855981|      0|      28|         0| 3749735525|        1|
| 335269852|1641491432|      0|       2|         0| 1490350911|        1|
| 437050836|1238456614|      0|       2|         0|  991277599|        1|
| 447244169|2095085551|      0|      -1|         0| 1579858878|        0|
| 516353916|1076364848|      0|       3|         1| 3597645735|        0|
| 528218683|1151525474|      0|       1|         0| 3433080956|        0|
| 531967718|3632072502|      0|       1|         0| 3863085861|        1|
| 627948360|2823119321|      0|       0|         0| 4092665803|        1|
| 811791433|3513954032|      0|       2|         0|  415464198|        1|
| 830686203|  99027353|      0|       0|         0| 3549822604|        1|
|1008893291|1115453150|      0|       2|         0| 2245155244|        1|
|1239364869|2824096896|      0|       2|         1| 2579294650|        0|
|1287950172|1076364848|      0|       0|         0| 3597645735|        1|
|1345896548|2658555390|      0|       1|         0| 2025118823|        1|
|1354205322|2564682277|      0|       3|         0| 2563033185|        1|
|1408344828|1255629030|      0|      -1|         1|  804901063|        1|
|1452633375|1334001859|      0|       4|         0| 1488588320|        1|
|1625052108|3297535757|      0|       3|         0| 1972598895|        1|
+----------+----------+-------+--------+----------+-----------+---------+
like image 44
mputha Avatar answered Sep 30 '22 06:09

mputha


val df = Seq(
  ("1001", "1007"),
  ("1002", null),
  ("1003", "1005"),
  (null, "1006")
).toDF("user_id", "friend_id")

Data is:

+-------+---------+
|user_id|friend_id|
+-------+---------+
|   1001|     1007|
|   1002|     null|
|   1003|     1005|
|   null|     1006|
+-------+---------+

Drop rows containing any null or NaN values in the specified columns of the Seq:

df.na.drop(Seq("friend_id"))
  .show()

Output:

+-------+---------+
|user_id|friend_id|
+-------+---------+
|   1001|     1007|
|   1003|     1005|
|   null|     1006|
+-------+---------+

If do not specify columns, drop row as long as any column of a row contains null or NaN values:

df.na.drop()
  .show()

Output:

+-------+---------+
|user_id|friend_id|
+-------+---------+
|   1001|     1007|
|   1003|     1005|
+-------+---------+
like image 37
Wangbo Avatar answered Sep 30 '22 06:09

Wangbo


Another easy way to filter out null values from multiple columns in spark dataframe. Please pay attention there is AND between columns.

df.filter(" COALESCE(col1, col2, col3, col4, col5, col6) IS NOT NULL")

If you need to filter out rows that contain any null (OR connected) please use

df.na.drop()
like image 42
Erkan Şirin Avatar answered Sep 30 '22 06:09

Erkan Şirin