I need to join a dataframe with a string column to one with array of string so that if one of the values in the array is matched, the rows will join.
I tried this but I guess it's not support. Any other way to do this?
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val left = spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF("col1")
val right = spark.sparkContext.parallelize(Seq((Array(1, 2), "Yes"),(Array(3),"No"))).toDF("col1", "col2")
left.join(right,"col1")
Throws:
org.apache.spark.sql.AnalysisException: cannot resolve '(
col1
=col1
)' due to data type mismatch: differing types in '(col1
=
col1
)' (int and array).;;
This is the function to use if you want to concatenate all the values in an array field into one string value. You can specify an optional argument as a separator, and it can be any string. If you do not specify a separator, there will be nothing aded between the values.
The default join operation in Spark includes only values for keys present in both RDDs, and in the case of multiple values per key, provides all permutations of the key/value pair. The best scenario for a standard join is when both RDDs contain the same set of distinct keys.
If you want to flatten the arrays, use flatten function which converts array of array columns to a single array on DataFrame.
The most succinct way to do this is to use the array_contains spark sql expression as shown below, that said I've compared the performance of this with the performance of doing an explode and join as shown in a previous answer and the explode seems more performant.
import org.apache.spark.sql.functions.expr
import spark.implicits._
val left = Seq(1, 2, 3).toDF("col1")
val right = Seq((Array(1, 2), "Yes"),(Array(3),"No")).toDF("col1", "col2").withColumnRenamed("col1", "col1_array")
val joined = left.join(right, expr("array_contains(col1_array, col1)")).show
+----+----------+----+
|col1|col1_array|col2|
+----+----------+----+
| 1| [1, 2]| Yes|
| 2| [1, 2]| Yes|
| 3| [3]| No|
+----+----------+----+
Note you can't use the org.apache.spark.sql.functions.array_contains function directly as it requires the second argument to be a literal as opposed to a column expression.
One option is to create an UDF for building your join condition:
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
val left = spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF("col1")
val right = spark.sparkContext.parallelize(Seq((Array(1, 2), "Yes"),(Array(3),"No"))).toDF("col1", "col2")
val checkValue = udf {
(array: WrappedArray[Int], value: Int) => array.contains(value)
}
val result = left.join(right, checkValue(right("col1"), left("col1")), "inner")
result.show
+----+------+----+
|col1| col1|col2|
+----+------+----+
| 1|[1, 2]| Yes|
| 2|[1, 2]| Yes|
| 3| [3]| No|
+----+------+----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With