I am trying to do some analysis on sets. I have a sample data set that looks like this:
orders.json
{"items":[1,2,3,4,5]}
{"items":[1,2,5]}
{"items":[1,3,5]}
{"items":[3,4,5]}
All it is, is a single field that is a list of numbers that represent IDs.
Here is the Spark script I am trying to run:
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Dataframe Test")
val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)
val dataframe = sql.read.json("orders.json")
val expanded = dataframe
.explode[::[Long], Long]("items", "item1")(row => row)
.explode[::[Long], Long]("items", "item2")(row => row)
val grouped = expanded
.where(expanded("item1") !== expanded("item2"))
.groupBy("item1", "item2")
.count()
val recs = grouped
.groupBy("item1")
Creating expanded
and grouped
is fine, in a nutshell expanded
is a list of all the possible sets of two IDs where the two IDs were in the same original set. grouped
filters out IDs that were matched with themselves, then groups together all the unique pairs of IDs and produces a count for each. The schema and data sample of grouped
are:
root
|-- item1: long (nullable = true)
|-- item2: long (nullable = true)
|-- count: long (nullable = false)
[1,2,2]
[1,3,2]
[1,4,1]
[1,5,3]
[2,1,2]
[2,3,1]
[2,4,1]
[2,5,2]
...
So, my question is: how do I now group on the first item in each result so that I have a list of tuples? For the example data above, I would expect something similar to this:
[1, [(2, 2), (3, 2), (4, 1), (5, 3)]]
[2, [(1, 2), (3, 1), (4, 1), (5, 2)]]
As you can see in my script with recs
, I thought you would start by doing a groupBy on 'item1' which is the first item in each row. But after that you are left with this GroupedData object that has very limited actions on it. Really, you are only left with doing aggregations like sum, avg, etc. I just want to list the tuples from each result.
I could easily use RDD functions at this point, but that departs from using Dataframes. Is there a way to do this with the dataframe functions.
You can build that with org.apache.spark.sql.functions
(collect_list
and struct
) available since 1.6
val recs =grouped.groupBy('item1).agg(collect_list(struct('item2,'count)).as("set"))
+-----+----------------------------+
|item1|set |
+-----+----------------------------+
|1 |[[5,3], [4,1], [3,2], [2,2]]|
|2 |[[4,1], [1,2], [5,2], [3,1]]|
+-----+----------------------------+
You can use collect_set
also
Edit: for information, tuples
don't exist in dataframes. The closest structures are struct
since they are the equivalent of case classes in the untyped dataset API.
Edit 2: Also be warned that collect_set
comes with the caveat that the result is actually not a set (there is no datatype with set properties in the SQL types). That means that you can end up with distinct "sets" which differ by their order (in version 2.1.0 at least). Sorting them with sort_array
is then necessary.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With