Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark dataframes groupby into list

I am trying to do some analysis on sets. I have a sample data set that looks like this:

orders.json

{"items":[1,2,3,4,5]}
{"items":[1,2,5]}
{"items":[1,3,5]}
{"items":[3,4,5]}

All it is, is a single field that is a list of numbers that represent IDs.

Here is the Spark script I am trying to run:

val sparkConf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("Dataframe Test")

val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)

val dataframe = sql.read.json("orders.json")

val expanded = dataframe
  .explode[::[Long], Long]("items", "item1")(row => row)
  .explode[::[Long], Long]("items", "item2")(row => row)

val grouped = expanded
  .where(expanded("item1") !== expanded("item2"))
  .groupBy("item1", "item2")
  .count()

val recs = grouped
  .groupBy("item1")

Creating expanded and grouped is fine, in a nutshell expanded is a list of all the possible sets of two IDs where the two IDs were in the same original set. grouped filters out IDs that were matched with themselves, then groups together all the unique pairs of IDs and produces a count for each. The schema and data sample of grouped are:

root
 |-- item1: long (nullable = true)
 |-- item2: long (nullable = true)
 |-- count: long (nullable = false)

[1,2,2]
[1,3,2]
[1,4,1]
[1,5,3]
[2,1,2]
[2,3,1]
[2,4,1]
[2,5,2]
...

So, my question is: how do I now group on the first item in each result so that I have a list of tuples? For the example data above, I would expect something similar to this:

[1, [(2, 2), (3, 2), (4, 1), (5, 3)]]
[2, [(1, 2), (3, 1), (4, 1), (5, 2)]]

As you can see in my script with recs, I thought you would start by doing a groupBy on 'item1' which is the first item in each row. But after that you are left with this GroupedData object that has very limited actions on it. Really, you are only left with doing aggregations like sum, avg, etc. I just want to list the tuples from each result.

I could easily use RDD functions at this point, but that departs from using Dataframes. Is there a way to do this with the dataframe functions.

like image 683
kruthar Avatar asked Aug 06 '15 20:08

kruthar


1 Answers

You can build that with org.apache.spark.sql.functions (collect_list and struct) available since 1.6

val recs =grouped.groupBy('item1).agg(collect_list(struct('item2,'count)).as("set"))


+-----+----------------------------+
|item1|set                         |
+-----+----------------------------+
|1    |[[5,3], [4,1], [3,2], [2,2]]|
|2    |[[4,1], [1,2], [5,2], [3,1]]|
+-----+----------------------------+

You can use collect_set also

Edit: for information, tuples don't exist in dataframes. The closest structures are struct since they are the equivalent of case classes in the untyped dataset API.

Edit 2: Also be warned that collect_set comes with the caveat that the result is actually not a set (there is no datatype with set properties in the SQL types). That means that you can end up with distinct "sets" which differ by their order (in version 2.1.0 at least). Sorting them with sort_array is then necessary.

like image 199
Wilmerton Avatar answered Sep 17 '22 12:09

Wilmerton