I am trying to extract features based on sliding window over time series data.
In Scala, it seems like there is a sliding
function based on this post and the documentation
import org.apache.spark.mllib.rdd.RDDFunctions._
sc.parallelize(1 to 100, 10)
.sliding(3)
.map(curSlice => (curSlice.sum / curSlice.size))
.collect()
My questions is there similar functions in PySpark? Or how do we achieve similar sliding window transformations if there is no such function yet?
Sliding Window controls transmission of data packets between various computer networks. Spark Streaming library provides windowed computations where the transformations on RDDs are applied over a sliding window of data.
Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.
PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually. It is also popularly growing to perform data transformations.
sql. GroupedData object which contains a agg() method to perform aggregate on a grouped DataFrame. After performing aggregates this function returns a PySpark DataFrame. To use aggregate functions like sum() , avg() , min() , max() e.t.c you have to import from pyspark.
To add to venuktan's answer, here is how to create a time-based sliding window using Spark SQL and retain the full contents of the window, rather than taking an aggregate of it. This was needed in my use case of preprocessing time series data into sliding windows for input into Spark ML.
One limitation of this approach is that we assume you want to take sliding windows over time.
Firstly, you may create your Spark DataFrame, for example by reading in a CSV file:
df = spark.read.csv('foo.csv')
We assume that your CSV file has two columns: one of which is a unix timestamp and the other which is a column you want to extract sliding windows from.
from pyspark.sql import functions as f
window_duration = '1000 millisecond'
slide_duration = '500 millisecond'
df.withColumn("_c0", f.from_unixtime(f.col("_c0"))) \
.groupBy(f.window("_c0", window_duration, slide_duration)) \
.agg(f.collect_list(f.array('_c1'))) \
.withColumnRenamed('collect_list(array(_c1))', 'sliding_window')
Bonus: to convert this array column to the DenseVector format required for Spark ML, see the UDF approach here.
Extra Bonus: to un-nest the resulting column, such that each element of your sliding window has its own column, try this approach here.
I hope this helps, please let me know if I can clarify anything.
As far as I can tell sliding
function is not available from Python and SlidingRDD
is a private class and cannot be accessed outside MLlib
.
If you to use sliding
on an existing RDD you can create poor man's sliding
like this:
def sliding(rdd, n):
assert n > 0
def gen_window(xi, n):
x, i = xi
return [(i - offset, (i, x)) for offset in xrange(n)]
return (
rdd.
zipWithIndex(). # Add index
flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
groupByKey(). # Group to create windows
# Sort values to ensure order inside window and drop indices
mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
sortByKey(). # Sort to makes sure we keep original order
values(). # Get values
filter(lambda x: len(x) == n)) # Drop beginning and end
Alternatively you can try something like this (with a small help of toolz
)
from toolz.itertoolz import sliding_window, concat
def sliding2(rdd, n):
assert n > 1
def get_last_el(i, iter):
"""Return last n - 1 elements from the partition"""
return [(i, [x for x in iter][(-n + 1):])]
def slide(i, iter):
"""Prepend previous items and return sliding window"""
return sliding_window(n, concat([last_items.value[i - 1], iter]))
def clean_last_items(last_items):
"""Adjust for empty or to small partitions"""
clean = {-1: [None] * (n - 1)}
for i in range(rdd.getNumPartitions()):
clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
return {k: tuple(v) for k, v in clean.items()}
last_items = sc.broadcast(clean_last_items(
rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))
return rdd.mapPartitionsWithIndex(slide)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With