I am writing a Spark Structured Streaming program. I need to create an additional column with the lag difference.
To reproduce my issue, I provide the code snippet. This code consumes data.json
file stored in data
folder:
[
{"id": 77,"type": "person","timestamp": 1532609003},
{"id": 77,"type": "person","timestamp": 1532609005},
{"id": 78,"type": "crane","timestamp": 1532609005}
]
Code:
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("Test") \
.master("local[2]") \
.getOrCreate()
schema = StructType([
StructField("id", IntegerType()),
StructField("type", StringType()),
StructField("timestamp", LongType())
])
ds = spark \
.readStream \
.format("json") \
.schema(schema) \
.load("data/")
diff_window = Window.partitionBy("id").orderBy("timestamp")
ds = ds.withColumn("prev_timestamp", func.lag(ds.timestamp).over(diff_window))
query = ds \
.writeStream \
.format('console') \
.start()
query.awaitTermination()
I get this error:
pyspark.sql.utils.AnalysisException: u'Non-time-based windows are not supported on streaming DataFrames/Datasets;;\nWindow [lag(timestamp#71L, 1, null) windowspecdefinition(host_id#68, timestamp#71L ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS prev_timestamp#129L]
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Watermarking is a feature in Spark Structured Streaming that is used to handle the data that arrives late. Spark Structured Streaming can maintain the state of the data that arrives, store it in memory, and update it accurately by aggregating it with the data that arrived late.
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.
Window function: returns the value that is offset rows before the current row, and default if there is less than offset rows before the current row. For example, an offset of one will return the previous row at any given point in the window partition.
pyspark.sql.utils.AnalysisException: u'Non-time-based windows are not supported on streaming DataFrames/Datasets
Meaning that your window should be based on a timestamp
column. So it you have a data point for each second, and you make a 30s
window with a stride
of 10s
, your resultant window would create a new window
column, with start
and end
columns which will contain timestamps with a difference of 30s
.
You should use the window in this way:
words = words.withColumn('date_time', F.col('date_time').cast('timestamp'))
w = F.window('date_time', '30 seconds', '10 seconds')
words = words \
.withWatermark('date_format', '1 minutes') \
.groupBy(w).agg(F.mean('value'))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With