I have a dataset containing data like the following:
|c1| c2|
---------
| 1 | a |
| 1 | b |
| 1 | c |
| 2 | a |
| 2 | b |
...
Now, I want to get the data grouped like the following (col1: String Key, col2: List):
| c1| c2 |
-----------
| 1 |a,b,c|
| 2 | a, b|
...
I thought that using goupByKey would be an sufficient solution, but I can't find any example, how to use it.
Can anyone help me to find a solution using groupByKey or using any other combination of transformations and actions to get this output by using datasets, not RDD?
In Spark, the groupByKey function is a frequently used transformation operation that performs shuffling of data. It receives key-value pairs (K, V) as an input, group the values based on key and generates a dataset of (K, Iterable ) pairs as an output.
On applying groupByKey() on a dataset of (K, V) pairs, the data shuffle according to the key value K in another RDD. In this transformation, lots of unnecessary data transfer over the network.
A Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset also has an untyped view called a DataFrame , which is a Dataset of Row . Operations available on Datasets are divided into transformations and actions.
Datasets and DataFrames A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL's optimized execution engine.
Here is Spark 2.0 and Java example with Dataset.
public class SparkSample {
public static void main(String[] args) {
//SparkSession
SparkSession spark = SparkSession
.builder()
.appName("SparkSample")
.config("spark.sql.warehouse.dir", "/file:C:/temp")
.master("local")
.getOrCreate();
//input data
List<Tuple2<Integer,String>> inputList = new ArrayList<Tuple2<Integer,String>>();
inputList.add(new Tuple2<Integer,String>(1, "a"));
inputList.add(new Tuple2<Integer,String>(1, "b"));
inputList.add(new Tuple2<Integer,String>(1, "c"));
inputList.add(new Tuple2<Integer,String>(2, "a"));
inputList.add(new Tuple2<Integer,String>(2, "b"));
//dataset
Dataset<Row> dataSet = spark.createDataset(inputList, Encoders.tuple(Encoders.INT(), Encoders.STRING())).toDF("c1","c2");
dataSet.show();
//groupBy and aggregate
Dataset<Row> dataSet1 = dataSet.groupBy("c1").agg(org.apache.spark.sql.functions.collect_list("c2")).toDF("c1","c2");
dataSet1.show();
//stop
spark.stop();
}
}
With a DataFrame in Spark 2.0:
scala> val data = List((1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b")).toDF("c1", "c2")
data: org.apache.spark.sql.DataFrame = [c1: int, c2: string]
scala> data.groupBy("c1").agg(collect_list("c2")).collect.foreach(println)
[1,WrappedArray(a, b, c)]
[2,WrappedArray(a, b)]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With