Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark Moving Average

I have a huge file in HDFS having Time Series data points (Yahoo Stock prices).

I want to find the moving average of the Time Series how do I go about writing the Apache Spark job to do that .

like image 943
Ahmed Shabib Avatar asked May 01 '14 05:05

Ahmed Shabib


People also ask

How do you find the average of a column in spark DataFrame?

Method -1 : Using select() method If we want to return the average value from multiple columns, we have to use the avg() method inside the select() method by specifying the column name separated by a comma. Where, df is the input PySpark DataFrame. column_name is the column to get the average value.

What is the difference between rolling and moving average?

A rolling average, sometimes referred to as a moving average, is a metric that calculates trends over short periods of time using a set of data. Specifically, it helps calculate trends when they might otherwise be difficult to detect.

What is r moving average?

“Moving averages is a smoothing approach that averages values from a window of consecutive time periods, thereby generating a series of averages. The moving average approaches primarily differ based on the number of values averaged, how the average is computed, and how many times averaging is performed”.

What is moving average in data mining?

A moving average can be as simple as sequence of arithmetic averages for the values in a time series. In fact, this is the definition of a simple moving average, which is the focus of this tip. Simple arithmetic averages are computed for a window with a fixed number of periods.

Is it possible to do a moving average in spark?

6 1 Moving average is a tricky problem for Spark, and any distributed system. When the data is spread across multiple machines, there will be some time windows that cross partitions. I think the key is duplicating data points at the start and end of partitions. I will try to think of a way to do this in Spark. – Daniel Darabos May 1, 2014 at 12:54

How to get the moving average of the last n days?

If you have the time series of market prices of a share, you can easily compute the Moving Average of the last n days. In opposite to GROUP BY clauses, where only one output row per group exists, with window functions all rows of the result set retain their identity and are shown."

How to add a value to a list in spark?

Once the size is reached, compute Average and do context.write(). In the Next Map() method call, add the new value to the list, delete the oldest value from the list and compute Average and do context.write(). SPARK, does not give the control of accumulating values till with in a Task, and managing its count etc


2 Answers

You can use the sliding function from MLLIB which probably does the same thing as Daniel's answer. You will have to sort the data by time before using the sliding function.

import org.apache.spark.mllib.rdd.RDDFunctions._  sc.parallelize(1 to 100, 10)   .sliding(3)   .map(curSlice => (curSlice.sum / curSlice.size))   .collect() 
like image 102
Arvind Avatar answered Oct 23 '22 03:10

Arvind


Moving average is a tricky problem for Spark, and any distributed system. When the data is spread across multiple machines, there will be some time windows that cross partitions. We have to duplicate the data at the start of the partitions, so that calculating the moving average per partition gives complete coverage.

Here is a way to do this in Spark. The example data:

val ts = sc.parallelize(0 to 100, 10) val window = 3 

A simple partitioner that puts each row in the partition we specify by the key:

class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {   def numPartitions = p   def getPartition(key: Any) = key.asInstanceOf[Int] } 

Create the data with the first window - 1 rows copied to the previous partition:

val partitioned = ts.mapPartitionsWithIndex((i, p) => {   val overlap = p.take(window - 1).toArray   val spill = overlap.iterator.map((i - 1, _))   val keep = (overlap.iterator ++ p).map((i, _))   if (i == 0) keep else keep ++ spill }).partitionBy(new StraightPartitioner(ts.partitions.length)).values 

Just calculate the moving average on each partition:

val movingAverage = partitioned.mapPartitions(p => {   val sorted = p.toSeq.sorted   val olds = sorted.iterator   val news = sorted.iterator   var sum = news.take(window - 1).sum   (olds zip news).map({ case (o, n) => {     sum += n     val v = sum     sum -= o     v   }}) }) 

Because of the duplicate segments this will have no gaps in coverage.

scala> movingAverage.collect.sameElements(3 to 297 by 3) res0: Boolean = true 
like image 26
Daniel Darabos Avatar answered Oct 23 '22 02:10

Daniel Darabos