Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple aggregations in Spark Structured Streaming

Tags:

I would like to do multiple aggregations in Spark Structured Streaming.

Something like this:

  • Read a stream of input files (from a folder)
  • Perform aggregation 1 (with some transformations)
  • Perform aggregation 2 (and more transformations)

When I run this in Structured Streaming, it gives me an error "Multiple streaming aggregations are not supported with streaming DataFrames/Datasets".

Is there a way to do such multiple aggregations in Structured Streaming?

like image 944
Kaptrain Avatar asked Dec 07 '16 06:12

Kaptrain


People also ask

How does Spark handle duplicates in streaming?

Spark doesn't have a distinct method that takes columns that should run distinct on however, Spark provides another signature of dropDuplicates() function which takes multiple columns to eliminate duplicates. Note that calling dropDuplicates() on DataFrame returns a new DataFrame with duplicate rows removed.

Which of the two output modes are same when the query has no aggregations?

If the query doesn't contain aggregations, it will be equivalent to Append mode.

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.


Video Answer


2 Answers

This is not supported, but there are other ways also. Like performing single aggregation and saving it to kafka. Read it from kafka and apply aggregation again. This has worked for me.

like image 80
Mahesh Chand Avatar answered Sep 26 '22 03:09

Mahesh Chand


As in Spark 2.4.4 (latest for now) is NOT support the Multiple streaming aggregations you can use the .foreachBatch() method

A dummy example:

query =  spark
        .readStream
        .format('kafka')
        .option(..)
        .load()

       .writeStream
       .trigger(processingTime='x seconds')
       .outputMode('append')
       .foreachBatch(foreach_batch_function)
       .start()

query.awaitTermination()        


def foreach_batch_function(df, epoch_id):
     # Transformations (many aggregations)
     pass   
like image 36
ggeop Avatar answered Sep 24 '22 03:09

ggeop