I have a Spark 2.0 dataframe example
with the following structure:
id, hour, count id1, 0, 12 id1, 1, 55 .. id1, 23, 44 id2, 0, 12 id2, 1, 89 .. id2, 23, 34 etc.
It contains 24 entries for each id (one for each hour of the day) and is ordered by id, hour using the orderBy function.
I have created an Aggregator groupConcat
:
def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable { override def zero: String = "" override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat) override def merge(b1: String, b2: String) = b1 + b2 override def finish(b: String) = b.substring(1) override def bufferEncoder: Encoder[String] = Encoders.STRING override def outputEncoder: Encoder[String] = Encoders.STRING }.toColumn
It helps me concatenate columns into strings to obtain this final dataframe:
id, hourly_count id1, 12:55:..:44 id2, 12:89:..:34 etc.
My question is, if I do example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count")
, does that guarantee that the hourly counts will be ordered correctly in their respective buckets?
I read that this is not necessarily the case for RDDs (see Spark sort by key and then group by to get ordered iterable?), but maybe it's different for DataFrames ?
If not, how can I work around it ?
Does it mean collect_list also maintains the order? In your code, you sort the entire dataset before collect_list() so yes. But this is not necessary, it is more efficient to sort the resulting list of tuples after collecting both date and value in a list.
The groupBy method is defined in the Dataset class. groupBy returns a RelationalGroupedDataset object where the agg() method is defined. Spark makes great use of object oriented programming! The RelationalGroupedDataset class also defines a sum() method that can be used to get the same result with less code.
We will sort the table using the sort() function in which we will access the column using the col() function and desc() function to sort it in descending order.
Filter, groupBy and map are the examples of transformations. Actions: Actions refer to an operation which also applies on RDD, that instructs Spark to perform computation and send the result back to driver. This is an example of action.
groupBy
after orderBy
doesn't maintain order, as others have pointed out. What you want to do is use a Window function, partitioned on id and ordered by hours. You can collect_list
over this and then take the max (largest) of the resulting lists since they go cumulatively (i.e. the first hour will only have itself in the list, the second hour will have 2 elements in the list, and so on).
Complete example code:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window import spark.implicits._ val data = Seq(( "id1", 0, 12), ("id1", 1, 55), ("id1", 23, 44), ("id2", 0, 12), ("id2", 1, 89), ("id2", 23, 34)).toDF("id", "hour", "count") val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")} data.withColumn("collected", collect_list($"count") .over(Window.partitionBy("id") .orderBy("hour"))) .groupBy("id") .agg(max($"collected").as("collected")) .withColumn("hourly_count", mergeList($"collected")) .select("id", "hourly_count").show
This keeps us within the DataFrame world. I also simplified the UDF code the OP was using.
Output:
+---+------------+ | id|hourly_count| +---+------------+ |id1| 12:55:44| |id2| 12:89:34| +---+------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With