Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use a non-time-based window with spark data streaming structure?

I am trying to use window on structured streaming with spark and kafka. I use window on non-time-based data, so I get this error:

'Non-time-based windows are not supported on streaming DataFrames/Datasets;;\nWindow

Here is my code:

window = Window.partitionBy("input_id").orderBy("similarity")
outputDf = inputDf\
        .crossJoin(ticketDf.withColumnRenamed("IDF", "old_IDF")) \
        .withColumn("similarity", cosine_similarity_udf(col("IDF"), col("old_IDF"))) \
        .withColumn("rank", rank().over(window)) \
        .filter(col("rank") < 10)

So I am looking for a tip or a reference to use window on non-time-based data...

like image 596
Kaharon Avatar asked Oct 16 '22 16:10

Kaharon


2 Answers

The traditional SQL windowing with over() is not supported in Spark Structured Streaming (The only windowing it supports is time-based windowing). If you Think about it, it is probably to avoid confusions. Some may falsely assume that Spark Structured Streaming can partition the whole data based on a column (it is impossible because streams are unbounded input data).

You instead can use groupBy(). groupBy() is also a state-full operation which is impossible to implement on append mode, unless we include a timestamp column in the list of columns that we want to do a groupBy operation on. For example:

df_result = df.withWatermark("createdAt", "10 minutes" ) \
              .groupBy( F.col('Id'), window(F.col("createdAt"), self.acceptable_time_difference)) \
              .agg(F.max(F.col('createdAt')).alias('maxCreatedAt'))

In this example createdAt is a timestamp typed column. Please note that in this case, we have to call withWatermrke on the timestamp column beforehand, because Spark cannot store the states boundlessly.

ps: I know groupBy does not function exactly like windowing, but with a simple join or custom function with mapGroupsWithState, you may be able to implement the desired functionality.

like image 80
Mahnaz Avatar answered Oct 21 '22 07:10

Mahnaz


Windows always needs time-based data, but Spark Structured Streaming no.

You can create Spark Structured Streaming with the trigger "as_soon_as_posible" and you can group the data by window, the group is on time.

Reference: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time

like image 30
Pablo López Gallego Avatar answered Oct 21 '22 08:10

Pablo López Gallego