I am getting the following error when I run my spark job:
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets;;
I am not sure if the issue is being caused due to lack of a watermark,which I don't know how to apply in this context. Following is the aggregation operation applied:
def aggregateByValue(): DataFrame = {
df.withColumn("Value", expr("(BookingClass, Value)"))
.groupBy("AirlineCode", "Origin", "Destination", "PoS", "TravelDate", "StartSaleDate", "EndSaleDate", "avsFlag")
.agg(collect_list("Value").as("ValueSeq"))
.drop("Value")
}
Usage:
val theGroupedDF = theDF
.multiplyYieldByHundred
.explodeDates
.aggregateByValue
val query = theGroupedDF.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
Changing the outputMode
to complete
solved the issue.
val query = theGroupedDF.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With