I've got a DataFrame I'm operating on, and I want to group by a set of columns and operate per-group on the rest of the columns. In regular RDD
-land I think it would look something like this:
rdd.map( tup => ((tup._1, tup._2, tup._3), tup) ).
groupByKey().
forEachPartition( iter => doSomeJob(iter) )
In DataFrame
-land I'd start like this:
df.groupBy("col1", "col2", "col3") // Reference by name
but then I'm not sure how to operate on the groups if my operations are more complicated than the mean/min/max/count offered by GroupedData.
For example, I want to build a single MongoDB document per ("col1", "col2", "col3")
group (by iterating through the associated Row
s in the group), scale down to N
partitions, then insert the docs into a MongoDB database. The N
limit is the max number of simultaneous connections I want.
Any advice?
The groupBy method is defined in the Dataset class. groupBy returns a RelationalGroupedDataset object where the agg() method is defined. Spark makes great use of object oriented programming! The RelationalGroupedDataset class also defines a sum() method that can be used to get the same result with less code.
1 Answer. Suppose you have a df that includes columns “name” and “age”, and on these two columns you want to perform groupBY. Now, in order to get other columns also after doing a groupBy you can use join function. Now, data_joined will have all columns including the count values.
You can do a self-join. First get the groups:
val groups = df.groupBy($"col1", $"col2", $"col3").agg($"col1", $"col2", $"col3")
Then you can join this back to the original DataFrame:
val joinedDF = groups
.select($"col1" as "l_col1", $"col2" as "l_col2", $"col3" as "l_col3)
.join(df, $"col1" <=> $"l_col1" and $"col2" <=> $"l_col2" and $"col3" <=> $"l_col3")
While this gets you exactly the same data you had originally (and with 3 additional, redundant columns) you could do another join to add a column with the MongoDB document ID for the (col1, col2, col3) group associated with the row.
At any rate, in my experience joins and self-joins are the way you handle complicated stuff in DataFrames.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With