Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to filter Spark dataframe by array column containing any of the values of some other dataframe/set

I have a Dataframe A that contains a column of array string.

...
 |-- browse: array (nullable = true)
 |    |-- element: string (containsNull = true)
...

For example three sample rows would be

+---------+--------+---------+
| column 1|  browse| column n|
+---------+--------+---------+
|     foo1| [X,Y,Z]|     bar1|
|     foo2|   [K,L]|     bar2|
|     foo3|     [M]|     bar3|

And another Dataframe B that contains a column of string

|-- browsenodeid: string (nullable = true)

Some sample rows for it would be

+------------+
|browsenodeid|
+------------+
|           A|
|           Z|
|           M|

How can I filter A so that I keep all the rows whose browse contains any of the the values of browsenodeid from B? In terms of the above examples the result will be:

+---------+--=-----+---------+
| column 1|  browse| column n|
+---------+--------+---------+
|     foo1| [X,Y,Z]|     bar1| <- because Z is a value of B.browsenodeid
|     foo3|     [M]|     bar3| <- because M is a value of B.browsenodeid

If I had a single value then I would use something like

A.filter(array_contains(A("browse"), single_value))

But what do I do with a list or DataFrame of values?

like image 554
Vassilis Moustakas Avatar asked Feb 05 '23 11:02

Vassilis Moustakas


1 Answers

I found an elegant solution for this, without the need to cast DataFrames/Datasets to RDDs.

Assuming you have a DataFrame dataDF:

+---------+--------+---------+
| column 1|  browse| column n|
+---------+--------+---------+
|     foo1| [X,Y,Z]|     bar1|
|     foo2|   [K,L]|     bar2|
|     foo3|     [M]|     bar3|

and an array b containing the values you want to match in browse

val b: Array[String] = Array(M,Z)

Implement the udf:

import org.apache.spark.sql.expressions.UserDefinedFunction
import scala.collection.mutable.WrappedArray

def array_contains_any(s:Seq[String]): UserDefinedFunction = {
udf((c: WrappedArray[String]) =>
  c.toList.intersect(s).nonEmpty)
}

and then simply use the filter or where function (with a little bit of fancy currying :P) to do the filtering like:

dataDF.where(array_contains_any(b)($"browse"))
like image 194
Vassilis Moustakas Avatar answered Feb 06 '23 23:02

Vassilis Moustakas