Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Join dataframe column with an array

I have two DataFrames with two columns

  • df1 with schema (key1:Long, Value)

  • df2 with schema (key2:Array[Long], Value)

I need to join these DataFrames on the key columns (find matching values between key1 and values in key2). But the problem is that they have not the same type. Is there a way to do this?

like image 341
syl Avatar asked Jan 11 '17 15:01

syl


People also ask

How do I combine columns in Spark data frame?

Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.

How do I join multiple sparks in DataFrame?

In order to explain join with multiple tables, we will use Inner join, this is the default join in Spark and it's mostly used, this joins two DataFrames/Datasets on key columns, and where keys don't match the rows get dropped from both datasets.

How do you join columns in PySpark?

PySpark Concatenate Using concat() concat() function of Pyspark SQL is used to concatenate multiple DataFrame columns into a single column. It can also be used to concatenate column types string, binary, and compatible array columns.


2 Answers

The best way to do this (and the one that doesn't require any casting or exploding of dataframes) is to use the array_contains spark sql expression as shown below.

import org.apache.spark.sql.functions.expr
import spark.implicits._

val df1 = Seq((1L,"one.df1"), (2L,"two.df1"),(3L,"three.df1")).toDF("key1","Value")

val df2 = Seq((Array(1L,1L),"one.df2"), (Array(2L,2L),"two.df2"), (Array(3L,3L),"three.df2")).toDF("key2","Value")

val joinedRDD = df1.join(df2, expr("array_contains(key2, key1)")).show

+----+---------+------+---------+
|key1|    Value|  key2|    Value|
+----+---------+------+---------+
|   1|  one.df1|[1, 1]|  one.df2|
|   2|  two.df1|[2, 2]|  two.df2|
|   3|three.df1|[3, 3]|three.df2|
+----+---------+------+---------+

Please note that you cannot use the org.apache.spark.sql.functions.array_contains function directly as it requires the second argument to be a literal as opposed to a column expression.

like image 77
randal25 Avatar answered Sep 28 '22 17:09

randal25


You can cast the type of key1 and key2 and then use the contains function, as follow.

val df1 = sc.parallelize(Seq((1L,"one.df1"), 
                             (2L,"two.df1"),      
                             (3L,"three.df1"))).toDF("key1","Value")  

DF1:
+----+---------+
|key1|Value    |
+----+---------+
|1   |one.df1  |
|2   |two.df1  |
|3   |three.df1|
+----+---------+

val df2 = sc.parallelize(Seq((Array(1L,1L),"one.df2"),
                             (Array(2L,2L),"two.df2"),
                             (Array(3L,3L),"three.df2"))).toDF("key2","Value")
DF2:
+------+---------+
|key2  |Value    |
+------+---------+
|[1, 1]|one.df2  |
|[2, 2]|two.df2  |
|[3, 3]|three.df2|
+------+---------+

val joinedRDD = df1.join(df2, col("key2").cast("string").contains(col("key1").cast("string")))

JOIN:
+----+---------+------+---------+
|key1|Value    |key2  |Value    |
+----+---------+------+---------+
|1   |one.df1  |[1, 1]|one.df2  |
|2   |two.df1  |[2, 2]|two.df2  |
|3   |three.df1|[3, 3]|three.df2|
+----+---------+------+---------+
like image 23
pheeleeppoo Avatar answered Sep 28 '22 18:09

pheeleeppoo