Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Column.isin with array column in join?

case class Foo1(codes:Seq[String], name:String)
case class Foo2(code:String, description:String)

val ds1 = Seq(
  Foo1(Seq("A"),           "foo1"),
  Foo1(Seq("A", "B"),      "foo2"),
  Foo1(Seq("B", "C", "D"), "foo3"),
  Foo1(Seq("C"),           "foo4"),
  Foo1(Seq("C", "D"),      "foo5")
).toDS

val ds2 = Seq(
  Foo2("A", "product A"),
  Foo2("B", "product B"),
  Foo2("C", "product C"),
  Foo2("D", "product D"),
  Foo2("E", "product E")
).toDS

val j = ds1.join(ds2, ds2("code") isin (ds1("codes")))

Hopefully this Scala code fragment makes it clear what I'm trying to accomplish, our data is structured so that one data set has a column which contains an array of values, and I wish to join the values within that collection to another data set. So for example Seq("A", "B") in ds1 would join with "A" and "B" in ds2.

The "isin" operator on Column seems to be exactly what I want, and this builds and runs, but when I run it I get the following error:

org.apache.spark.sql.AnalysisException: cannot resolve '(code IN (codes))' due to data type mismatch: Arguments must be same type;;

Reading further I see that isin() wants to take a varargs ("splatted args") and seems more suitable for a filter(). So my question is, is this the intended use of this operator, or is there some other way to perform this type of join?

like image 917
Uncle Long Hair Avatar asked Aug 03 '17 14:08

Uncle Long Hair


1 Answers

Please use array_contains:

ds1.crossJoin(ds2).where("array_contains(codes, code)").show

+---------+----+----+-----------+
|    codes|name|code|description|
+---------+----+----+-----------+
|      [A]|foo1|   A|  product A|
|   [A, B]|foo2|   A|  product A|
|   [A, B]|foo2|   B|  product B|
|[B, C, D]|foo3|   B|  product B|
|[B, C, D]|foo3|   C|  product C|
|[B, C, D]|foo3|   D|  product D|
|      [C]|foo4|   C|  product C|
|   [C, D]|foo5|   C|  product C|
|   [C, D]|foo5|   D|  product D|
+---------+----+----+-----------+

If you use Spark 1.x or 2.0 replace crossJoin with standard join, and enable cross joins in configuration, if necessary.

It might by possible to avoid Cartesian product with explode:

ds1.withColumn("code", explode($"codes")).join(ds2, Seq("code")).show
+----+---------+----+-----------+                                               
|code|    codes|name|description|
+----+---------+----+-----------+
|   B|   [A, B]|foo2|  product B|
|   B|[B, C, D]|foo3|  product B|
|   D|[B, C, D]|foo3|  product D|
|   D|   [C, D]|foo5|  product D|
|   C|[B, C, D]|foo3|  product C|
|   C|      [C]|foo4|  product C|
|   C|   [C, D]|foo5|  product C|
|   A|      [A]|foo1|  product A|
|   A|   [A, B]|foo2|  product A|
+----+---------+----+-----------+
like image 157
Alper t. Turker Avatar answered Nov 06 '22 07:11

Alper t. Turker