Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark DataFrame: does groupBy after orderBy maintain that order?

I have a Spark 2.0 dataframe example with the following structure:

id, hour, count id1, 0, 12 id1, 1, 55 .. id1, 23, 44 id2, 0, 12 id2, 1, 89 .. id2, 23, 34 etc. 

It contains 24 entries for each id (one for each hour of the day) and is ordered by id, hour using the orderBy function.

I have created an Aggregator groupConcat:

  def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {     override def zero: String = ""      override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)      override def merge(b1: String, b2: String) = b1 + b2      override def finish(b: String) = b.substring(1)      override def bufferEncoder: Encoder[String] = Encoders.STRING      override def outputEncoder: Encoder[String] = Encoders.STRING   }.toColumn 

It helps me concatenate columns into strings to obtain this final dataframe:

id, hourly_count id1, 12:55:..:44 id2, 12:89:..:34 etc. 

My question is, if I do example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"), does that guarantee that the hourly counts will be ordered correctly in their respective buckets?

I read that this is not necessarily the case for RDDs (see Spark sort by key and then group by to get ordered iterable?), but maybe it's different for DataFrames ?

If not, how can I work around it ?

like image 204
Ana Todor Avatar asked Sep 15 '16 07:09

Ana Todor


People also ask

Does collect list maintain order?

Does it mean collect_list also maintains the order? In your code, you sort the entire dataset before collect_list() so yes. But this is not necessary, it is more efficient to sort the resulting list of tuples after collecting both date and value in a list.

How does groupBy work in Spark?

The groupBy method is defined in the Dataset class. groupBy returns a RelationalGroupedDataset object where the agg() method is defined. Spark makes great use of object oriented programming! The RelationalGroupedDataset class also defines a sum() method that can be used to get the same result with less code.

How does PySpark sort grouped data?

We will sort the table using the sort() function in which we will access the column using the col() function and desc() function to sort it in descending order.

Is groupBy an action in Spark?

Filter, groupBy and map are the examples of transformations. Actions: Actions refer to an operation which also applies on RDD, that instructs Spark to perform computation and send the result back to driver. This is an example of action.


1 Answers

groupBy after orderBy doesn't maintain order, as others have pointed out. What you want to do is use a Window function, partitioned on id and ordered by hours. You can collect_list over this and then take the max (largest) of the resulting lists since they go cumulatively (i.e. the first hour will only have itself in the list, the second hour will have 2 elements in the list, and so on).

Complete example code:

import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window import spark.implicits._  val data = Seq(( "id1", 0, 12),   ("id1", 1, 55),   ("id1", 23, 44),   ("id2", 0, 12),   ("id2", 1, 89),   ("id2", 23, 34)).toDF("id", "hour", "count")      val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}     data.withColumn("collected", collect_list($"count")                                                     .over(Window.partitionBy("id")                                                                  .orderBy("hour")))             .groupBy("id")             .agg(max($"collected").as("collected"))             .withColumn("hourly_count", mergeList($"collected"))             .select("id", "hourly_count").show 

This keeps us within the DataFrame world. I also simplified the UDF code the OP was using.

Output:

+---+------------+ | id|hourly_count| +---+------------+ |id1|    12:55:44| |id2|    12:89:34| +---+------------+ 
like image 109
Adair Avatar answered Oct 11 '22 07:10

Adair