Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Aggregate rows of Spark DataFrame to String after groupby

I'm quite new both Spark and Scale and could really need a hint to solve my problem. So I have two DataFrames A (columns id and name) and B (columns id and text) would like to join them, group by id and combine all rows of text into a single String:

A

+--------+--------+
|      id|    name|
+--------+--------+
|       0|       A|
|       1|       B|
+--------+--------+

B

+--------+ -------+
|      id|    text|
+--------+--------+
|       0|     one|
|       0|     two|
|       1|   three|
|       1|    four|
+--------+--------+

desired result:

+--------+--------+----------+
|      id|    name|     texts|
+--------+--------+----------+
|       0|       A|   one two|
|       1|       B|three four|
+--------+--------+----------+

So far I'm trying the following:

var C = A.join(B, "id")
var D = C.groupBy("id", "name").agg(collect_list("text") as "texts")

This works quite well besides that my texts column is an Array of Strings instead of a String. I would appreciate some help very much.

like image 924
Peter Klauke Avatar asked Jul 04 '17 16:07

Peter Klauke


People also ask

What is AGG () in Spark?

agg(Column expr, scala.collection.Seq<Column> exprs) Compute aggregates by specifying a series of aggregate columns. DataFrame. agg(scala.collection.immutable.Map<String,String> exprs) (Scala-specific) Compute aggregates by specifying a map from column name to aggregate methods.

How do I get other columns with Spark DataFrame groupBy?

1 Answer. Suppose you have a df that includes columns “name” and “age”, and on these two columns you want to perform groupBY. Now, in order to get other columns also after doing a groupBy you can use join function. Now, data_joined will have all columns including the count values.

How do you use groupBy and count in Pyspark?

When we perform groupBy() on PySpark Dataframe, it returns GroupedData object which contains below aggregate functions. count() – Use groupBy() count() to return the number of rows for each group. mean() – Returns the mean of values for each group. max() – Returns the maximum of values for each group.


2 Answers

I am just adding some minor functions in yours to give the right solution, which is

A.join(B, Seq("id"), "left").orderBy("id").groupBy("id", "name").agg(concat_ws(" ", collect_list("text")) as "texts")
like image 121
Ramesh Maharjan Avatar answered Oct 20 '22 06:10

Ramesh Maharjan


It's quite simple:

val bCollected = b.groupBy('id).agg(collect_list('text).as("texts")
val ab = a.join(bCollected, a("id") == bCollected("id"), "left")

First DataFrame is immediate result, b DataFrame that has texts collected for every id. Then you are joining it with a. bCollected should be smaller that b itself, so it will probably get better shuffle time

like image 35
T. Gawęda Avatar answered Oct 20 '22 06:10

T. Gawęda