I have a huge file in HDFS having Time Series data points (Yahoo Stock prices).
I want to find the moving average of the Time Series how do I go about writing the Apache Spark job to do that .
Method -1 : Using select() method If we want to return the average value from multiple columns, we have to use the avg() method inside the select() method by specifying the column name separated by a comma. Where, df is the input PySpark DataFrame. column_name is the column to get the average value.
A rolling average, sometimes referred to as a moving average, is a metric that calculates trends over short periods of time using a set of data. Specifically, it helps calculate trends when they might otherwise be difficult to detect.
“Moving averages is a smoothing approach that averages values from a window of consecutive time periods, thereby generating a series of averages. The moving average approaches primarily differ based on the number of values averaged, how the average is computed, and how many times averaging is performed”.
A moving average can be as simple as sequence of arithmetic averages for the values in a time series. In fact, this is the definition of a simple moving average, which is the focus of this tip. Simple arithmetic averages are computed for a window with a fixed number of periods.
6 1 Moving average is a tricky problem for Spark, and any distributed system. When the data is spread across multiple machines, there will be some time windows that cross partitions. I think the key is duplicating data points at the start and end of partitions. I will try to think of a way to do this in Spark. – Daniel Darabos May 1, 2014 at 12:54
If you have the time series of market prices of a share, you can easily compute the Moving Average of the last n days. In opposite to GROUP BY clauses, where only one output row per group exists, with window functions all rows of the result set retain their identity and are shown."
Once the size is reached, compute Average and do context.write(). In the Next Map() method call, add the new value to the list, delete the oldest value from the list and compute Average and do context.write(). SPARK, does not give the control of accumulating values till with in a Task, and managing its count etc
You can use the sliding function from MLLIB which probably does the same thing as Daniel's answer. You will have to sort the data by time before using the sliding function.
import org.apache.spark.mllib.rdd.RDDFunctions._ sc.parallelize(1 to 100, 10) .sliding(3) .map(curSlice => (curSlice.sum / curSlice.size)) .collect()
Moving average is a tricky problem for Spark, and any distributed system. When the data is spread across multiple machines, there will be some time windows that cross partitions. We have to duplicate the data at the start of the partitions, so that calculating the moving average per partition gives complete coverage.
Here is a way to do this in Spark. The example data:
val ts = sc.parallelize(0 to 100, 10) val window = 3
A simple partitioner that puts each row in the partition we specify by the key:
class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = key.asInstanceOf[Int] }
Create the data with the first window - 1
rows copied to the previous partition:
val partitioned = ts.mapPartitionsWithIndex((i, p) => { val overlap = p.take(window - 1).toArray val spill = overlap.iterator.map((i - 1, _)) val keep = (overlap.iterator ++ p).map((i, _)) if (i == 0) keep else keep ++ spill }).partitionBy(new StraightPartitioner(ts.partitions.length)).values
Just calculate the moving average on each partition:
val movingAverage = partitioned.mapPartitions(p => { val sorted = p.toSeq.sorted val olds = sorted.iterator val news = sorted.iterator var sum = news.take(window - 1).sum (olds zip news).map({ case (o, n) => { sum += n val v = sum sum -= o v }}) })
Because of the duplicate segments this will have no gaps in coverage.
scala> movingAverage.collect.sameElements(3 to 297 by 3) res0: Boolean = true
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With