Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use dataset to groupby

I have a request to use rdd to do so:

val test = Seq(("New York", "Jack"),
    ("Los Angeles", "Tom"),
    ("Chicago", "David"),
    ("Houston", "John"),
    ("Detroit", "Michael"),
    ("Chicago", "Andrew"),
    ("Detroit", "Peter"),
    ("Detroit", "George")
  )
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println)

The result is that:

(New York,List(Jack))

(Detroit,List(Michael, Peter, George))

(Los Angeles,List(Tom))

(Houston,List(John))

(Chicago,List(David, Andrew))

How to do it use dataset with spark2.0?

I have a way to use a custom function, but the feeling is so complicated, there is no simple point method?

like image 842
monkeysjourney Avatar asked Jun 07 '17 06:06

monkeysjourney


People also ask

How do I use Groupby in Python?

You call . groupby() and pass the name of the column that you want to group on, which is "state" . Then, you use ["last_name"] to specify the columns on which you want to perform the actual aggregation. You can pass a lot more than just a single column name to .

How do you DataFrame a group data in Python?

groupby() function is used to split the data into groups based on some criteria. pandas objects can be split on any of their axes. The abstract definition of grouping is to provide a mapping of labels to group names. sort : Sort group keys.

How does Groupby in data frame would work?

Group DataFrame using a mapper or by a Series of columns. A groupby operation involves some combination of splitting the object, applying a function, and combining the results. This can be used to group large amounts of data and compute operations on these groups. Used to determine the groups for the groupby.

When should I use a Groupby in pandas?

Pandas' GroupBy is a powerful and versatile function in Python. It allows you to split your data into separate groups to perform computations for better analysis.


1 Answers

I would suggest you to start with creating a case class as

case class Monkey(city: String, firstName: String)

This case class should be defined outside the main class. Then you can just use toDS function and use groupBy and aggregation function called collect_list as below

import sqlContext.implicits._
import org.apache.spark.sql.functions._

val test = Seq(("New York", "Jack"),
  ("Los Angeles", "Tom"),
  ("Chicago", "David"),
  ("Houston", "John"),
  ("Detroit", "Michael"),
  ("Chicago", "Andrew"),
  ("Detroit", "Peter"),
  ("Detroit", "George")
)
sc.parallelize(test)
  .map(row => Monkey(row._1, row._2))
  .toDS()
  .groupBy("city")
  .agg(collect_list("firstName") as "list")
  .show(false)

You will have output as

+-----------+------------------------+
|city       |list                    |
+-----------+------------------------+
|Los Angeles|[Tom]                   |
|Detroit    |[Michael, Peter, George]|
|Chicago    |[David, Andrew]         |
|Houston    |[John]                  |
|New York   |[Jack]                  |
+-----------+------------------------+

You can always convert back to RDD by just calling .rdd function

like image 113
Ramesh Maharjan Avatar answered Nov 12 '22 18:11

Ramesh Maharjan