Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Joining with array

I need to join a dataframe with a string column to one with array of string so that if one of the values in the array is matched, the rows will join.

I tried this but I guess it's not support. Any other way to do this?

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._

val left = spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF("col1")
val right = spark.sparkContext.parallelize(Seq((Array(1, 2), "Yes"),(Array(3),"No"))).toDF("col1", "col2")

left.join(right,"col1")

Throws:

org.apache.spark.sql.AnalysisException: cannot resolve '(col1 =col1)' due to data type mismatch: differing types in '(col1 =

col1)' (int and array).;;

like image 385
aclowkay Avatar asked Aug 07 '17 11:08

aclowkay


People also ask

What is Array_join in SQL?

This is the function to use if you want to concatenate all the values in an array field into one string value. You can specify an optional argument as a separator, and it can be any string. If you do not specify a separator, there will be nothing aded between the values.

How does a join work in spark?

The default join operation in Spark includes only values for keys present in both RDDs, and in the case of multiple values per key, provides all permutations of the key/value pair. The best scenario for a standard join is when both RDDs contain the same set of distinct keys.

How do you flatten an array in PySpark?

If you want to flatten the arrays, use flatten function which converts array of array columns to a single array on DataFrame.


2 Answers

The most succinct way to do this is to use the array_contains spark sql expression as shown below, that said I've compared the performance of this with the performance of doing an explode and join as shown in a previous answer and the explode seems more performant.

import org.apache.spark.sql.functions.expr
import spark.implicits._

val left = Seq(1, 2, 3).toDF("col1")

val right = Seq((Array(1, 2), "Yes"),(Array(3),"No")).toDF("col1", "col2").withColumnRenamed("col1", "col1_array")

val joined = left.join(right, expr("array_contains(col1_array, col1)")).show

+----+----------+----+
|col1|col1_array|col2|
+----+----------+----+
|   1|    [1, 2]| Yes|
|   2|    [1, 2]| Yes|
|   3|       [3]|  No|
+----+----------+----+

Note you can't use the org.apache.spark.sql.functions.array_contains function directly as it requires the second argument to be a literal as opposed to a column expression.

like image 191
randal25 Avatar answered Oct 19 '22 11:10

randal25


One option is to create an UDF for building your join condition:

import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

val left = spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF("col1")
val right = spark.sparkContext.parallelize(Seq((Array(1, 2), "Yes"),(Array(3),"No"))).toDF("col1", "col2")

val checkValue = udf { 
  (array: WrappedArray[Int], value: Int) => array.contains(value) 
}
val result = left.join(right, checkValue(right("col1"), left("col1")), "inner")

result.show

+----+------+----+
|col1|  col1|col2|
+----+------+----+
|   1|[1, 2]| Yes|
|   2|[1, 2]| Yes|
|   3|   [3]|  No|
+----+------+----+
like image 35
Daniel de Paula Avatar answered Oct 19 '22 09:10

Daniel de Paula