Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark - join one to many relationship dataframes

Tags:

apache-spark

Lets take the following toy problem, I've got the following case classes:

case class Order(id: String, name: String, status: String)
case class TruncatedOrder(id: String)
case class Org(name: String, ord: Seq[TruncatedOrder])

I've now got the following defined variables

val ordersDF = Seq(Order("or1", "stuff", "shipped"), Order("or2", "thigns", "delivered") , Order("or3", "thingamabobs", "never received"), Order("or4", "???", "what?")).toDS()
val orgsDF = Seq(Org("tupper", Seq(TruncatedOrder("or1"), TruncatedOrder("or2"), TruncatedOrder("or3"))), Org("ware", Seq(TruncatedOrder("or3"), TruncatedOrder("or4")))).toDS()  

What I would like is to have for example a datapoint that looks as follows
Ord("tupper", Array(Joined("or1", "stuff", "shipped"), Joined("or2", "things", "delivered"), ...)

Im wondering how to format my join statements and filter statements.

like image 467
TheM00s3 Avatar asked Nov 08 '16 00:11

TheM00s3


People also ask

How do I join multiple DataFrames in Spark?

In order to explain join with multiple tables, we will use Inner join, this is the default join in Spark and it's mostly used, this joins two DataFrames/Datasets on key columns, and where keys don't match the rows get dropped from both datasets.

What is Leftanti join in Spark?

The left anti join in PySpark is similar to the join functionality, but it returns only columns from the left DataFrame for non-matched records.

What is different types of joins in Spark classical joins?

The Spark SQL supports several types of joins such as inner join, cross join, left outer join, right outer join, full outer join, left semi-join, left anti join. Joins scenarios are implemented in Spark SQL based upon the business use case. Some of the joins require high resource and computation efficiency.


1 Answers

Here is how I was able to get the data into the format that I wanted. This answer is inspired a lot by the answers provided by @ulrich and @Mariusz.

val ud = udf((col: String, name: String, status: String) => { Seq(col, name, status)})

orgsDF
  .select($"name".as("ordName"),explode($"ord.id"))
  .join(ordersDF, $"col" === $"id").drop($"id")
  .select($"ordName", ud($"col", $"name", $"status"))
  .groupBy($"ordName")
  .agg(collect_set($"order"))
  .show()

    +-------+--------------------------------------------------------------------------------------------------------------------------+
    |ordName|orders                                                                                                                    |
    +-------+--------------------------------------------------------------------------------------------------------------------------+
    |ware   |[WrappedArray(or4, ???, what?), WrappedArray(or3, thingamabobs, never received)]                                          |
    |tupper |[WrappedArray(or1, stuff, shipped), WrappedArray(or2, thigns, delivered), WrappedArray(or3, thingamabobs, never received)]|
    +-------+--------------------------------------------------------------------------------------------------------------------------+
like image 196
TheM00s3 Avatar answered Sep 30 '22 19:09

TheM00s3