Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark - Dealing with Sliding Windows on Temporal RDDs

I've been working quite a lot with Apache Spark the last few months but now I have received a pretty difficult task, to compute average/minimum/maximum etcetera on a sliding window over a paired RDD where the Key component is a date tag and the value component is a matrix. So each aggregation function should also return a matrix, where for each cell the average for all of that cell in the time period is averaged.

I want to be able to say that I want the average for every 7 days, with a sliding window of one day. The sliding window movement unit is always one, and then the unit of the size of the window (so if it's every 12 weeks, the window movement unit is 1).

My initial thought now is to simply iterate, if we want an average per X days, X times, and for each time just group the elements by it's date, with an offset.

So if we have this scenario:

Days: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

Matrices: A B C D E F G H I J K L M N O

And we want the average per 5 days, I will iterate 5 times and show the grouping here:

First iteration:

Group 1: (1, A) (2, B) (3, C) (4, D) (5, E)

Group 2: (6, F) (7, G) (8, H) (9, I) (10, J)

Group 3: (11, K) (12, L) (13, M) (14, N) (15, O)

Second iteration:

Group 1: (2, B) (3, C) (4, D) (5, E) (6, F)

Group 2: (7, G) (8, H) (9, I) (10, J), (11, K)

Group 3: (12, L) (13, M) (14, N) (15, O)

Etcetera, and for each group, I have to do a fold/reduce procedure to get the average.

However as you might imagine, this is pretty slow and probably a rather bad way to do it. I can't really figure out any better way to do it though.

like image 685
Johan S Avatar asked Dec 17 '14 12:12

Johan S


People also ask

What do you mean by sliding window operation in Apache spark?

Sliding Window controls transmission of data packets between various computer networks. Spark Streaming library provides windowed computations where the transformations on RDDs are applied over a sliding window of data.

How many RDDs can Cogroup () can work at once?

cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.

What is SchemaRDD in Spark RDD?

SchemaRDDs are composed Row objects along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, Parquet file, a JSON dataset, or by running HiveQL against data stored in Apache Hive.

Can we edit the data of RDD for example the case conversion?

Resilient Distributed Datasets (RDDs) RDDs are immutable (read-only) in nature. You cannot change an original RDD, but you can create new RDDs by performing coarse-grain operations, like transformations, on an existing RDD.


1 Answers

If you convert to a DataFrame, this all gets a lot simpler -- you can just self-join the data back on itself and find the average. Say I have a series of data like this:

tsDF.show
date       amount
1970-01-01 10.0
1970-01-01 5.0
1970-01-01 7.0
1970-01-02 14.0
1970-01-02 13.9
1970-01-03 1.0
1970-01-03 5.0
1970-01-03 9.0
1970-01-04 9.0
1970-01-04 5.8
1970-01-04 2.8
1970-01-04 8.9
1970-01-05 8.1
1970-01-05 2.1
1970-01-05 2.78
1970-01-05 20.78

Which rolls up as:

tsDF.groupBy($"date").agg($"date", sum($"amount"), count($"date")).show
date       SUM(amount) COUNT(date)
1970-01-01 22.0        3
1970-01-02 27.9        2
1970-01-03 15.0        3
1970-01-04 26.5        4
1970-01-05 33.76       4

I then would need to create a UDF to shift the date for the join condition (note I am only using a 2 day window by using offset = -2):

def dateShift(myDate: java.sql.Date): java.sql.Date = {
  val offset = -2;
  val cal = Calendar.getInstance;
  cal.setTime(myDate);
  cal.add(Calendar.DATE, offset);
  new java.sql.Date(cal.getTime.getTime)
}
val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)

And then I could easily find a 2-day rolling average like this:

val windowDF = tsDF.select($"date")
  .groupBy($"date")
  .agg($"date")
  .join(
    tsDF.select($"date" as "r_date", $"amount" as "r_amount"),
    $"r_date" > udfDateShift($"date") and $"r_date" <= $"date"
  )
  .groupBy($"date")
  .agg($"date",avg($"r_amount") as "2 day avg amount / record")

val windowDF.show
date       2 day avg amount / record
1970-01-01 7.333333333333333
1970-01-02 9.98
1970-01-03 8.58
1970-01-04 5.928571428571429
1970-01-05 7.5325

While this isn't exactly what you were trying to do, you see how you can use a DataFrame self-join to extract running averages from a data set. Hope you found this helpful.

like image 197
David Griffin Avatar answered Sep 22 '22 17:09

David Griffin