I've been working quite a lot with Apache Spark the last few months but now I have received a pretty difficult task, to compute average/minimum/maximum etcetera on a sliding window over a paired RDD
where the Key component is a date tag and the value component is a matrix. So each aggregation function should also return a matrix, where for each cell the average for all of that cell in the time period is averaged.
I want to be able to say that I want the average for every 7 days, with a sliding window of one day. The sliding window movement unit is always one, and then the unit of the size of the window (so if it's every 12 weeks, the window movement unit is 1).
My initial thought now is to simply iterate, if we want an average per X days, X times, and for each time just group the elements by it's date, with an offset.
So if we have this scenario:
Days: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Matrices: A B C D E F G H I J K L M N O
And we want the average per 5 days, I will iterate 5 times and show the grouping here:
First iteration:
Group 1: (1, A) (2, B) (3, C) (4, D) (5, E)
Group 2: (6, F) (7, G) (8, H) (9, I) (10, J)
Group 3: (11, K) (12, L) (13, M) (14, N) (15, O)
Second iteration:
Group 1: (2, B) (3, C) (4, D) (5, E) (6, F)
Group 2: (7, G) (8, H) (9, I) (10, J), (11, K)
Group 3: (12, L) (13, M) (14, N) (15, O)
Etcetera, and for each group, I have to do a fold/reduce procedure to get the average.
However as you might imagine, this is pretty slow and probably a rather bad way to do it. I can't really figure out any better way to do it though.
Sliding Window controls transmission of data packets between various computer networks. Spark Streaming library provides windowed computations where the transformations on RDDs are applied over a sliding window of data.
cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.
SchemaRDDs are composed Row objects along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, Parquet file, a JSON dataset, or by running HiveQL against data stored in Apache Hive.
Resilient Distributed Datasets (RDDs) RDDs are immutable (read-only) in nature. You cannot change an original RDD, but you can create new RDDs by performing coarse-grain operations, like transformations, on an existing RDD.
If you convert to a DataFrame, this all gets a lot simpler -- you can just self-join the data back on itself and find the average. Say I have a series of data like this:
tsDF.show
date amount
1970-01-01 10.0
1970-01-01 5.0
1970-01-01 7.0
1970-01-02 14.0
1970-01-02 13.9
1970-01-03 1.0
1970-01-03 5.0
1970-01-03 9.0
1970-01-04 9.0
1970-01-04 5.8
1970-01-04 2.8
1970-01-04 8.9
1970-01-05 8.1
1970-01-05 2.1
1970-01-05 2.78
1970-01-05 20.78
Which rolls up as:
tsDF.groupBy($"date").agg($"date", sum($"amount"), count($"date")).show
date SUM(amount) COUNT(date)
1970-01-01 22.0 3
1970-01-02 27.9 2
1970-01-03 15.0 3
1970-01-04 26.5 4
1970-01-05 33.76 4
I then would need to create a UDF to shift the date for the join condition (note I am only using a 2 day window by using offset = -2):
def dateShift(myDate: java.sql.Date): java.sql.Date = {
val offset = -2;
val cal = Calendar.getInstance;
cal.setTime(myDate);
cal.add(Calendar.DATE, offset);
new java.sql.Date(cal.getTime.getTime)
}
val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)
And then I could easily find a 2-day rolling average like this:
val windowDF = tsDF.select($"date")
.groupBy($"date")
.agg($"date")
.join(
tsDF.select($"date" as "r_date", $"amount" as "r_amount"),
$"r_date" > udfDateShift($"date") and $"r_date" <= $"date"
)
.groupBy($"date")
.agg($"date",avg($"r_amount") as "2 day avg amount / record")
val windowDF.show
date 2 day avg amount / record
1970-01-01 7.333333333333333
1970-01-02 9.98
1970-01-03 8.58
1970-01-04 5.928571428571429
1970-01-05 7.5325
While this isn't exactly what you were trying to do, you see how you can use a DataFrame self-join to extract running averages from a data set. Hope you found this helpful.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With