Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

GroupByKey with datasets in Spark 2.0 using Java

I have a dataset containing data like the following:

|c1| c2|
---------
| 1 | a |
| 1 | b |
| 1 | c |
| 2 | a |
| 2 | b |

...

Now, I want to get the data grouped like the following (col1: String Key, col2: List):

| c1| c2 |
-----------
| 1 |a,b,c|
| 2 | a, b|
...

I thought that using goupByKey would be an sufficient solution, but I can't find any example, how to use it.

Can anyone help me to find a solution using groupByKey or using any other combination of transformations and actions to get this output by using datasets, not RDD?

like image 918
Andreas Avatar asked Sep 08 '16 12:09

Andreas


People also ask

How do I use GroupByKey in spark?

In Spark, the groupByKey function is a frequently used transformation operation that performs shuffling of data. It receives key-value pairs (K, V) as an input, group the values based on key and generates a dataset of (K, Iterable ) pairs as an output.

How is data shuffled using the GroupByKey function in spark?

On applying groupByKey() on a dataset of (K, V) pairs, the data shuffle according to the key value K in another RDD. In this transformation, lots of unnecessary data transfer over the network.

What is Dataset in Java spark?

A Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset also has an untyped view called a DataFrame , which is a Dataset of Row . Operations available on Datasets are divided into transformations and actions.

Can we use Datasets in Pyspark?

Datasets and DataFrames A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL's optimized execution engine.


2 Answers

Here is Spark 2.0 and Java example with Dataset.

public class SparkSample {
    public static void main(String[] args) {
    //SparkSession
    SparkSession spark = SparkSession
            .builder()
            .appName("SparkSample")
            .config("spark.sql.warehouse.dir", "/file:C:/temp")
            .master("local")
            .getOrCreate();     
    //input data
    List<Tuple2<Integer,String>> inputList = new ArrayList<Tuple2<Integer,String>>();
    inputList.add(new Tuple2<Integer,String>(1, "a"));
    inputList.add(new Tuple2<Integer,String>(1, "b"));
    inputList.add(new Tuple2<Integer,String>(1, "c"));
    inputList.add(new Tuple2<Integer,String>(2, "a"));
    inputList.add(new Tuple2<Integer,String>(2, "b"));          
    //dataset
    Dataset<Row> dataSet = spark.createDataset(inputList, Encoders.tuple(Encoders.INT(), Encoders.STRING())).toDF("c1","c2");
    dataSet.show();     
    //groupBy and aggregate
    Dataset<Row> dataSet1 = dataSet.groupBy("c1").agg(org.apache.spark.sql.functions.collect_list("c2")).toDF("c1","c2");
    dataSet1.show();
    //stop
    spark.stop();
  }
}
like image 57
abaghel Avatar answered Oct 04 '22 00:10

abaghel


With a DataFrame in Spark 2.0:

scala> val data = List((1, "a"), (1, "b"), (1, "c"), (2, "a"), (2, "b")).toDF("c1", "c2")
data: org.apache.spark.sql.DataFrame = [c1: int, c2: string]
scala> data.groupBy("c1").agg(collect_list("c2")).collect.foreach(println)
[1,WrappedArray(a, b, c)]
[2,WrappedArray(a, b)]
like image 44
J Bentz Avatar answered Oct 04 '22 01:10

J Bentz