How would you perform basic joins in Spark using python? In R you could use merg() to do this. What is the syntax using python on spark for:
With two tables (RDD) with a single column in each that has a common key.
RDD(1):(key,U)
RDD(2):(key,V)
I think an inner join is something like this:
rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));
Is that right? I have searched the internet and can't find a good example of joins. Thanks in advance.
join(other, numPartitions = None) It returns RDD with a pair of elements with the matching keys and all the values for that particular key. In the following example, there are two pair of elements in two different RDDs. After joining these two RDDs, we get an RDD with elements having matching keys and their values.
Summary: Pyspark DataFrames have a join method which takes three parameters: DataFrame on the right side of the join, Which fields are being joined on, and what type of join (inner, outer, left_outer, right_outer, leftsemi). You call the join method from the left side DataFrame object such as df1. join(df2, df1.
In order to explain join with multiple tables, we will use Inner join, this is the default join in Spark and it's mostly used, this joins two DataFrames/Datasets on key columns, and where keys don't match the rows get dropped from both datasets.
It can be done either using PairRDDFunctions
or Spark Data Frames. Since data frame operations benefit from Catalyst Optimizer the second option is worth considering.
Assuming your data looks as follows:
rdd1 = sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 = sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])
Inner join:
rdd1.join(rdd2)
Left outer join:
rdd1.leftOuterJoin(rdd2)
Cartesian product (doesn't require RDD[(T, U)]
):
rdd1.cartesian(rdd2)
Broadcast join (doesn't require RDD[(T, U)]
):
Finally there is cogroup
which has no direct SQL equivalent but can be useful in some situations:
cogrouped = rdd1.cogroup(rdd2)
cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]
You can use either SQL DSL or execute raw SQL using sqlContext.sql
.
df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))
# Register temporary tables to be able to use `sparkSession.sql`
df1.createOrReplaceTempView('df1')
df2.createOrReplaceTempView('df2')
Inner join:
# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner')
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')
Left outer join:
df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')
Cross join (explicit cross join or configuration changes are required in Spark. 2.0 - spark.sql.crossJoin.enabled for Spark 2.x):
df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')
df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')
Since 1.6 (1.5 in Scala) each of these can be combined with broadcast
function:
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), df1.k == df2.k)
to perform broadcast join. See also Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With