I would like to do multiple aggregations in Spark Structured Streaming.
Something like this:
When I run this in Structured Streaming, it gives me an error "Multiple streaming aggregations are not supported with streaming DataFrames/Datasets".
Is there a way to do such multiple aggregations in Structured Streaming?
Spark doesn't have a distinct method that takes columns that should run distinct on however, Spark provides another signature of dropDuplicates() function which takes multiple columns to eliminate duplicates. Note that calling dropDuplicates() on DataFrame returns a new DataFrame with duplicate rows removed.
If the query doesn't contain aggregations, it will be equivalent to Append mode.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
This is not supported, but there are other ways also. Like performing single aggregation and saving it to kafka. Read it from kafka and apply aggregation again. This has worked for me.
As in Spark 2.4.4 (latest for now) is NOT support the Multiple streaming aggregations you can use the .foreachBatch() method
A dummy example:
query = spark
.readStream
.format('kafka')
.option(..)
.load()
.writeStream
.trigger(processingTime='x seconds')
.outputMode('append')
.foreachBatch(foreach_batch_function)
.start()
query.awaitTermination()
def foreach_batch_function(df, epoch_id):
# Transformations (many aggregations)
pass
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With