Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Group days into weeks with totals PySpark

I recently had help with a similar query DATE_ADD or DATE_DIFF error when grouping dates in BigQuery but I was wondering how to do this in PySpark as I am rather new

day         bitcoin_total   dash_total
2009-01-03  1               0
2009-01-09  14              0
2009-01-10  61              0

The desirable outcome would be the date at the start of the week (could be Monday or Sunday, whichever)

day         bitcoin_total   dash_total
2008-12-28  1               0
2009-01-04  75              0

The below code is returning weeks by numbers and the totals seems off. I cannot seem to replicate the total that .agg(sum()) is returning and I cannot even add the second total (dash_total). I tried .col("dash_total") Is there a way to group days into weeks?

from pyspark.sql.functions import weekofyear, sum

(df
    .groupBy(weekofyear("day").alias("date_by_week"))
    .agg(sum("bitcoin_total"))
    .orderBy("date_by_week")
    .show())

I am running Spark on Databricks.

like image 828
SozDaneron Avatar asked Aug 04 '19 21:08

SozDaneron


1 Answers

Try with this approach using date_sub,next_day functions in spark.

Explanation:

date_sub(
        next_day(col("day"),"sunday"), //get next sunday date
   7)) //substract week from the date

Example:

In pyspark:

from pyspark.sql.functions import *
df = sc.parallelize([("2009-01-03","1","0"),("2009-01-09","14","0"),("2009-01-10","61","0")]).toDF(["day","bitcoin_total","dash_total"])
df.withColumn("week_strt_day",date_sub(next_day(col("day"),"sunday"),7)).groupBy("week_strt_day").agg(sum("bitcoin_total").cast("int").alias("bitcoin_total"),sum("dash_total").cast("int").alias("dash_total")).orderBy("week_strt_day").show()

Result:

+-------------+-------------+----------+
|week_strt_day|bitcoin_total|dash_total|
+-------------+-------------+----------+
|   2008-12-28|            1|         0|
|   2009-01-04|           75|         0|
+-------------+-------------+----------+

In scala:

import org.apache.spark.sql.functions._
val df=Seq(("2009-01-03","1","0"),("2009-01-09","14","0"),("2009-01-10","61","0")).toDF("day","bitcoin_total","dash_total") 
df.withColumn("week_strt_day",date_sub(next_day('day,"sunday"),7)).groupBy("week_strt_day").agg(sum("bitcoin_total").cast("int").alias("bitcoin_total"),sum("dash_total").cast("int").alias("dash_total")).orderBy("week_strt_day").show()

Result:

+-------------+-------------+----------+
|week_strt_day|bitcoin_total|dash_total|
+-------------+-------------+----------+
|   2008-12-28|            1|         0|
|   2009-01-04|           75|         0|
+-------------+-------------+----------+
like image 151
notNull Avatar answered Oct 17 '22 20:10

notNull