Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to expire state of dropDuplicates in structured streaming to avoid OOM?

I want to count the unique access for each day using spark structured streaming, so I use the following code

.dropDuplicates("uuid")

and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:

.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")

but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.

So is there a perfect solution?

like image 727
Kevin Avatar asked Aug 03 '17 03:08

Kevin


People also ask

How do you handle late data in structured streaming?

Watermarking is a feature in Spark Structured Streaming that is used to handle the data that arrives late. Spark Structured Streaming can maintain the state of the data that arrives, store it in memory, and update it accurately by aggregating it with the data that arrived late.

Which property must a Spark structured streaming sink possess to ensure end to end exactly once semantics?

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.

What is difference between Dstreams and structured streaming?

Internally, a DStream is a sequence of RDDs. Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing.

How do I remove duplicates in Spark?

Duplicate rows could be remove or drop from Spark SQL DataFrame using distinct() and dropDuplicates() functions, distinct() can be used to remove rows that have the same values on all columns whereas dropDuplicates() can be used to remove rows that have the same values on multiple selected columns.


1 Answers

After a few days effort I finally find out the way myself.

While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:

.select(
    window($"timestamp", "1 day"),
    $"timestamp",
    $"uuid"
  )
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")

Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.

like image 70
Kevin Avatar answered Sep 19 '22 13:09

Kevin