I'm quite new both Spark and Scale and could really need a hint to solve my problem. So I have two DataFrames A (columns id and name) and B (columns id and text) would like to join them, group by id and combine all rows of text into a single String:
A
+--------+--------+
| id| name|
+--------+--------+
| 0| A|
| 1| B|
+--------+--------+
B
+--------+ -------+
| id| text|
+--------+--------+
| 0| one|
| 0| two|
| 1| three|
| 1| four|
+--------+--------+
desired result:
+--------+--------+----------+
| id| name| texts|
+--------+--------+----------+
| 0| A| one two|
| 1| B|three four|
+--------+--------+----------+
So far I'm trying the following:
var C = A.join(B, "id")
var D = C.groupBy("id", "name").agg(collect_list("text") as "texts")
This works quite well besides that my texts column is an Array of Strings instead of a String. I would appreciate some help very much.
agg(Column expr, scala.collection.Seq<Column> exprs) Compute aggregates by specifying a series of aggregate columns. DataFrame. agg(scala.collection.immutable.Map<String,String> exprs) (Scala-specific) Compute aggregates by specifying a map from column name to aggregate methods.
1 Answer. Suppose you have a df that includes columns “name” and “age”, and on these two columns you want to perform groupBY. Now, in order to get other columns also after doing a groupBy you can use join function. Now, data_joined will have all columns including the count values.
When we perform groupBy() on PySpark Dataframe, it returns GroupedData object which contains below aggregate functions. count() – Use groupBy() count() to return the number of rows for each group. mean() – Returns the mean of values for each group. max() – Returns the maximum of values for each group.
I am just adding some minor functions in yours to give the right solution, which is
A.join(B, Seq("id"), "left").orderBy("id").groupBy("id", "name").agg(concat_ws(" ", collect_list("text")) as "texts")
It's quite simple:
val bCollected = b.groupBy('id).agg(collect_list('text).as("texts")
val ab = a.join(bCollected, a("id") == bCollected("id"), "left")
First DataFrame is immediate result, b DataFrame that has texts collected for every id. Then you are joining it with a. bCollected should be smaller that b itself, so it will probably get better shuffle time
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With