Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to transform data with sliding window over time series data in Pyspark

I am trying to extract features based on sliding window over time series data. In Scala, it seems like there is a sliding function based on this post and the documentation

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()

My questions is there similar functions in PySpark? Or how do we achieve similar sliding window transformations if there is no such function yet?

like image 438
Bin Avatar asked Jul 28 '15 19:07

Bin


People also ask

What are the benefits of sliding window operations in Spark?

Sliding Window controls transmission of data packets between various computer networks. Spark Streaming library provides windowed computations where the transformations on RDDs are applied over a sliding window of data.

What does .collect do in PySpark?

Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.

How does window work in PySpark?

PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually. It is also popularly growing to perform data transformations.

How do you use AGG in PySpark?

sql. GroupedData object which contains a agg() method to perform aggregate on a grouped DataFrame. After performing aggregates this function returns a PySpark DataFrame. To use aggregate functions like sum() , avg() , min() , max() e.t.c you have to import from pyspark.


2 Answers

To add to venuktan's answer, here is how to create a time-based sliding window using Spark SQL and retain the full contents of the window, rather than taking an aggregate of it. This was needed in my use case of preprocessing time series data into sliding windows for input into Spark ML.

One limitation of this approach is that we assume you want to take sliding windows over time.

Firstly, you may create your Spark DataFrame, for example by reading in a CSV file:

df = spark.read.csv('foo.csv')

We assume that your CSV file has two columns: one of which is a unix timestamp and the other which is a column you want to extract sliding windows from.

from pyspark.sql import functions as f

window_duration = '1000 millisecond'
slide_duration = '500 millisecond'

df.withColumn("_c0", f.from_unixtime(f.col("_c0"))) \
    .groupBy(f.window("_c0", window_duration, slide_duration)) \
    .agg(f.collect_list(f.array('_c1'))) \
    .withColumnRenamed('collect_list(array(_c1))', 'sliding_window')

Bonus: to convert this array column to the DenseVector format required for Spark ML, see the UDF approach here.

Extra Bonus: to un-nest the resulting column, such that each element of your sliding window has its own column, try this approach here.

I hope this helps, please let me know if I can clarify anything.

like image 189
Shane Halloran Avatar answered Oct 06 '22 03:10

Shane Halloran


As far as I can tell sliding function is not available from Python and SlidingRDD is a private class and cannot be accessed outside MLlib.

If you to use sliding on an existing RDD you can create poor man's sliding like this:

def sliding(rdd, n):
    assert n > 0
    def gen_window(xi, n):
        x, i = xi
        return [(i - offset, (i, x)) for offset in xrange(n)]

    return (
        rdd.
        zipWithIndex(). # Add index
        flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
        groupByKey(). # Group to create windows
        # Sort values to ensure order inside window and drop indices
        mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
        sortByKey(). # Sort to makes sure we keep original order
        values(). # Get values
        filter(lambda x: len(x) == n)) # Drop beginning and end

Alternatively you can try something like this (with a small help of toolz)

from toolz.itertoolz import sliding_window, concat

def sliding2(rdd, n):
    assert n > 1

    def get_last_el(i, iter):
        """Return last n - 1 elements from the partition"""
        return  [(i, [x for x in iter][(-n + 1):])]

    def slide(i, iter):
        """Prepend previous items and return sliding window"""
        return sliding_window(n, concat([last_items.value[i - 1], iter]))

    def clean_last_items(last_items):
        """Adjust for empty or to small partitions"""
        clean = {-1: [None] * (n - 1)}
        for i in range(rdd.getNumPartitions()):
            clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
        return {k: tuple(v) for k, v in clean.items()}

    last_items = sc.broadcast(clean_last_items(
        rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))

    return rdd.mapPartitionsWithIndex(slide)
like image 23
zero323 Avatar answered Oct 06 '22 05:10

zero323