Imagine a Spark Dataframe consisting of value observations from variables. Each observation has a specific timestamp and those timestamps are not the same between different variables. This is because the timestamp is generated when the value of a variable changed and is recorded.
#Variable Time Value
#852-YF-007 2016-05-10 00:00:00 0
#852-YF-007 2016-05-09 23:59:00 0
#852-YF-007 2016-05-09 23:58:00 0
Problem I would like to put all variables into the same frequency (for instance 10min) using forward-fill. To visualize this, I copied a page from the Book "Python for Data Analysis". Question: How to do that on a Spark Dataframe in an efficient way?
stddev() in PySpark is used to return the standard deviation from a particular column in the DataFrame. Before that, we have to create PySpark DataFrame for demonstration.
PySpark Collect() – Retrieve data from DataFrame Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.
The Scala foldLeft method can be used to iterate over a data structure and perform multiple operations on a Spark DataFrame. foldLeft can be used to eliminate all whitespace in multiple columns or convert all the column names in a DataFrame to snake_case.
Question: How to do that on a Spark Dataframe in an efficient way?
Spark DataFrame
is simply not a good choice for an operation like this one. In general SQL primitives won't be expressive enough and PySpark DataFrame
doesn't provide low level access required to implement it.
While re-sampling can be easily represented using epoch / timestamp arithmetics. With data like this:
from pyspark.sql.functions import col, max as max_, min as min_
df = (spark
.createDataFrame([
("2012-06-13", 0.694), ("2012-06-20", -2.669), ("2012-06-27", 0.245)],
["ts", "val"])
.withColumn("ts", col("ts").cast("date").cast("timestamp")))
we can re-sample input:
day = 60 * 60 * 24
epoch = (col("ts").cast("bigint") / day).cast("bigint") * day
with_epoch = df.withColumn("epoch", epoch)
min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()
and join with reference:
# Reference range
ref = spark.range(
min_epoch, max_epoch + 1, day
).toDF("epoch")
(ref
.join(with_epoch, "epoch", "left")
.orderBy("epoch")
.withColumn("ts_resampled", col("epoch").cast("timestamp"))
.show(15, False))
## +----------+---------------------+------+---------------------+
## |epoch |ts |val |ts_resampled |
## +----------+---------------------+------+---------------------+
## |1339459200|2012-06-13 00:00:00.0|0.694 |2012-06-12 02:00:00.0|
## |1339545600|null |null |2012-06-13 02:00:00.0|
## |1339632000|null |null |2012-06-14 02:00:00.0|
## |1339718400|null |null |2012-06-15 02:00:00.0|
## |1339804800|null |null |2012-06-16 02:00:00.0|
## |1339891200|null |null |2012-06-17 02:00:00.0|
## |1339977600|null |null |2012-06-18 02:00:00.0|
## |1340064000|2012-06-20 00:00:00.0|-2.669|2012-06-19 02:00:00.0|
## |1340150400|null |null |2012-06-20 02:00:00.0|
## |1340236800|null |null |2012-06-21 02:00:00.0|
## |1340323200|null |null |2012-06-22 02:00:00.0|
## |1340409600|null |null |2012-06-23 02:00:00.0|
## |1340496000|null |null |2012-06-24 02:00:00.0|
## |1340582400|null |null |2012-06-25 02:00:00.0|
## |1340668800|2012-06-27 00:00:00.0|0.245 |2012-06-26 02:00:00.0|
## +----------+---------------------+------+---------------------+
In Spark >= 3.1 replace
col("epoch").cast("timestamp")
with
from pyspark.sql.functions import timestamp_seconds
timestamp_seconds("epoch")
Using low level APIs it is possible to fill data like this as I've shown in my answer to Spark / Scala: forward fill with last observation. Using RDDs we could also avoid shuffling data twice (once for join, once for reordering).
But there is much more important problem here. Spark performs optimally when problem can be reduced to element wise or partition wise computations. While forward fill is the case when it is possible, as far as I am aware this is typically not the case with commonly used time series models and if some operation requires a sequential access then Spark won't provide any benefits at all.
So if you work with series which are large enough to require distributed data structure you'll probably want to aggregate it to some object that can be easily handled by a single machine and then use your favorite non-distributed tool to handle the rest.
If you work with multiple time series where each can be handled in memory then there is of course sparkts
, but I know you're already aware of that.
I once answered a similar question, it'a bit of a hack but the idea makes sense in your case. Map every value on to a list, then flatten the list vertically.
From: Inserting records in a spark dataframe:
You can generate timestamp ranges, flatten them and select rows
import pyspark.sql.functions as func
from pyspark.sql.types import IntegerType, ArrayType
a=sc.parallelize([[670098928, 50],[670098930, 53], [670098934, 55]])\
.toDF(['timestamp','price'])
f=func.udf(lambda x:range(x,x+5),ArrayType(IntegerType()))
a.withColumn('timestamp',f(a.timestamp))\
.withColumn('timestamp',func.explode(func.col('timestamp')))\
.groupBy('timestamp')\
.agg(func.max(func.col('price')))\
.show()
+---------+----------+
|timestamp|max(price)|
+---------+----------+
|670098928| 50|
|670098929| 50|
|670098930| 53|
|670098931| 53|
|670098932| 53|
|670098933| 53|
|670098934| 55|
|670098935| 55|
|670098936| 55|
|670098937| 55|
|670098938| 55|
+---------+----------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With