Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark and SparkSQL: How to imitate window function?

Description

Given a dataframe df

id |       date
---------------
 1 | 2015-09-01
 2 | 2015-09-01
 1 | 2015-09-03
 1 | 2015-09-04
 2 | 2015-09-04

I want to create a running counter or index,

  • grouped by the same id and
  • sorted by date in that group,

thus

id |       date |  counter
--------------------------
 1 | 2015-09-01 |        1
 1 | 2015-09-03 |        2
 1 | 2015-09-04 |        3
 2 | 2015-09-01 |        1
 2 | 2015-09-04 |        2

This is something I can achieve with window function, e.g.

val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )

Unfortunately, Spark 1.4.1 does not support window functions for regular dataframes:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;

Questions

  • How can I achieve the above computation on current Spark 1.4.1 without using window functions?
  • When will window functions for regular dataframes be supported in Spark?

Thanks!

like image 702
Martin Senne Avatar asked Sep 04 '15 22:09

Martin Senne


People also ask

Can we use window function in Spark SQL?

Spark SQL supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions.

What is window unboundedPreceding PySpark?

unboundedPreceding , or any value less than or equal to -9223372036854775808. boundary end, inclusive. The frame is unbounded if this is Window. unboundedFollowing , or any value greater than or equal to 9223372036854775807.


3 Answers

You can use HiveContext for local DataFrames as well and, unless you have a very good reason not to, it is probably a good idea anyway. It is a default SQLContext available in spark-shell and pyspark shell (as for now sparkR seems to use plain SQLContext) and its parser is recommended by Spark SQL and DataFrame Guide.

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber

object HiveContextTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Hive Context")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    val df = sc.parallelize(
        ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
    ).toDF("k", "v")

    val w = Window.partitionBy($"k").orderBy($"v")
    df.select($"k", $"v", rowNumber.over(w).alias("rn")).show
  }
}
like image 155
zero323 Avatar answered Nov 02 '22 15:11

zero323


You can do this with RDDs. Personally I find the API for RDDs makes a lot more sense - I don't always want my data to be 'flat' like a dataframe.

val df = sqlContext.sql("select 1, '2015-09-01'"
    ).unionAll(sqlContext.sql("select 2, '2015-09-01'")
    ).unionAll(sqlContext.sql("select 1, '2015-09-03'")
    ).unionAll(sqlContext.sql("select 1, '2015-09-04'")
    ).unionAll(sqlContext.sql("select 2, '2015-09-04'"))

// dataframe as an RDD (of Row objects)
df.rdd 
  // grouping by the first column of the row
  .groupBy(r => r(0)) 
  // map each group - an Iterable[Row] - to a list and sort by the second column
  .map(g => g._2.toList.sortBy(row => row(1).toString))     
  .collect()

The above gives a result like the following:

Array[List[org.apache.spark.sql.Row]] = 
Array(
  List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]), 
  List([2,2015-09-01], [2,2015-09-04]))

If you want the position within the 'group' as well, you can use zipWithIndex.

df.rdd.groupBy(r => r(0)).map(g => 
    g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()

Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
  List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
  List(([2,2015-09-01],0), ([2,2015-09-04],1)))

You could flatten this back to a simple List/Array of Row objects using FlatMap, but if you need to perform anything on the 'group' that won't be a great idea.

The downside to using RDD like this is that it's tedious to convert from DataFrame to RDD and back again.

like image 43
Kirk Broadhurst Avatar answered Nov 02 '22 16:11

Kirk Broadhurst


I totally agree that Window functions for DataFrames are the way to go if you have Spark version (>=)1.5. But if you are really stuck with an older version(e.g 1.4.1), here is a hacky way to solve this

val df = sc.parallelize((1, "2015-09-01") :: (2, "2015-09-01") :: (1, "2015-09-03") :: (1, "2015-09-04") :: (1, "2015-09-04") :: Nil)
           .toDF("id", "date")

val dfDuplicate = df.selecExpr("id as idDup", "date as dateDup")
val dfWithCounter = df.join(dfDuplicate,$"id"===$"idDup")
                      .where($"date"<=$"dateDup")
                      .groupBy($"id", $"date")
                      .agg($"id", $"date", count($"idDup").as("counter"))
                      .select($"id",$"date",$"counter")

Now if you do dfWithCounter.show

You will get:

+---+----------+-------+                                                        
| id|      date|counter|
+---+----------+-------+
|  1|2015-09-01|      1|
|  1|2015-09-04|      3|
|  1|2015-09-03|      2|
|  2|2015-09-01|      1|
|  2|2015-09-04|      2|
+---+----------+-------+

Note that date is not sorted, but the counter is correct. Also you can change the ordering of the counter by changing the <= to >= in the where statement.

like image 40
Sayon M Avatar answered Nov 02 '22 14:11

Sayon M