I have two DataFrames with two columns
df1
with schema (key1:Long, Value)
df2
with schema (key2:Array[Long], Value)
I need to join these DataFrames on the key columns (find matching values between key1
and values in key2
). But the problem is that they have not the same type. Is there a way to do this?
Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.
In order to explain join with multiple tables, we will use Inner join, this is the default join in Spark and it's mostly used, this joins two DataFrames/Datasets on key columns, and where keys don't match the rows get dropped from both datasets.
PySpark Concatenate Using concat() concat() function of Pyspark SQL is used to concatenate multiple DataFrame columns into a single column. It can also be used to concatenate column types string, binary, and compatible array columns.
The best way to do this (and the one that doesn't require any casting or exploding of dataframes) is to use the array_contains
spark sql expression as shown below.
import org.apache.spark.sql.functions.expr
import spark.implicits._
val df1 = Seq((1L,"one.df1"), (2L,"two.df1"),(3L,"three.df1")).toDF("key1","Value")
val df2 = Seq((Array(1L,1L),"one.df2"), (Array(2L,2L),"two.df2"), (Array(3L,3L),"three.df2")).toDF("key2","Value")
val joinedRDD = df1.join(df2, expr("array_contains(key2, key1)")).show
+----+---------+------+---------+
|key1| Value| key2| Value|
+----+---------+------+---------+
| 1| one.df1|[1, 1]| one.df2|
| 2| two.df1|[2, 2]| two.df2|
| 3|three.df1|[3, 3]|three.df2|
+----+---------+------+---------+
Please note that you cannot use the org.apache.spark.sql.functions.array_contains
function directly as it requires the second argument to be a literal as opposed to a column expression.
You can cast the type of key1 and key2 and then use the contains function, as follow.
val df1 = sc.parallelize(Seq((1L,"one.df1"),
(2L,"two.df1"),
(3L,"three.df1"))).toDF("key1","Value")
DF1:
+----+---------+
|key1|Value |
+----+---------+
|1 |one.df1 |
|2 |two.df1 |
|3 |three.df1|
+----+---------+
val df2 = sc.parallelize(Seq((Array(1L,1L),"one.df2"),
(Array(2L,2L),"two.df2"),
(Array(3L,3L),"three.df2"))).toDF("key2","Value")
DF2:
+------+---------+
|key2 |Value |
+------+---------+
|[1, 1]|one.df2 |
|[2, 2]|two.df2 |
|[3, 3]|three.df2|
+------+---------+
val joinedRDD = df1.join(df2, col("key2").cast("string").contains(col("key1").cast("string")))
JOIN:
+----+---------+------+---------+
|key1|Value |key2 |Value |
+----+---------+------+---------+
|1 |one.df1 |[1, 1]|one.df2 |
|2 |two.df1 |[2, 2]|two.df2 |
|3 |three.df1|[3, 3]|three.df2|
+----+---------+------+---------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With