I have a problem dealing with time-series data. Due power failures some timestamps are missing in the dataset. I need to fill this gaps by adding rows, and after that, I can interpolate the missing values.
Input data:
periodstart usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039
Wanted output:
periodstart usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 02:30 0
2015-09-11 02:45 0
2015-09-11 03:00 0
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039
Now I have fixed this with a while loop within a dataset foreach function. The problem is that I have to collect the dataset first to the driver before I can do a while loop. So that is not the right way for Spark.
Can someone give me a better solution?
this is my code:
MissingMeasurementsDS.collect().foreach(row => {
// empty list for new generated measurements
val output = ListBuffer.empty[Measurement]
// Missing measurements
val missingMeasurements = row.getAs[Int]("missingmeasurements")
val lastTimestamp = row.getAs[Timestamp]("previousperiodstart")
//Generate missing timestamps
var i = 1
while (i <= missingMeasurements) {
//Increment timestamp with 15 minutes (900000 milliseconds)
val newTimestamp = lastTimestamp.getTime + (900000 * i)
output += Measurement(new Timestamp(newTimestamp), 0))
i += 1
}
//Join interpolated measurements with correct measurements
completeMeasurementsDS.join(output.toDS())
})
completeMeasurementsDS.show()
println("OutputDF count = " + completeMeasurementsDS.count())
If input DataFrame
has following structure:
root
|-- periodstart: timestamp (nullable = true)
|-- usage: long (nullable = true)
Scala
Determine min / max:
val (minp, maxp) = df
.select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint")))
.as[(Long, Long)]
.first
Set step, for example for 15 minutes:
val step: Long = 15 * 60
Generate reference range:
val reference = spark
.range((minp / step) * step, ((maxp / step) + 1) * step, step)
.select($"id".cast("timestamp").alias("periodstart"))
Join and fill the gaps:
reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage"))
Python
Similarly in PySpark:
from pyspark.sql.functions import col, min as min_, max as max_
step = 15 * 60
minp, maxp = df.select(
min_("periodstart").cast("long"), max_("periodstart").cast("long")
).first()
reference = spark.range(
(minp / step) * step, ((maxp / step) + 1) * step, step
).select(col("id").cast("timestamp").alias("periodstart"))
reference.join(df, ["periodstart"], "leftouter")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With