Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Structured Streaming exception when using append output mode with watermark

Despite the fact that I'm using withWatermark(), I'm getting the following error message when I run my spark job:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

From what I can see in the programming guide, this exactly matches the intended usage (and the example code). Does anyone know what might be wrong?

Thanks in advance!

Relevant Code (Java 8, Spark 2.2.0):

StructType logSchema = new StructType()
        .add("timestamp", TimestampType)
        .add("key", IntegerType)
        .add("val", IntegerType);

Dataset<Row> kafka = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("subscribe", topics)
        .load();

Dataset<Row> parsed = kafka
        .select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
        .select("parsed_value.*");

Dataset<Row> tenSecondCounts = parsed
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            parsed.col("key"),
            window(parsed.col("timestamp"), "1 day"))
        .count();

StreamingQuery query = tenSecondCounts
        .writeStream()
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode("append")
        .format("console")
        .option("truncate", false)
        .start();
like image 527
Ray J Avatar asked Aug 08 '17 21:08

Ray J


People also ask

What is watermarking in structured streaming?

Watermarking is a feature in Spark Structured Streaming that is used to handle the data that arrives late. Spark Structured Streaming can maintain the state of the data that arrives, store it in memory, and update it accurately by aggregating it with the data that arrived late.

What is difference between Dstreams and structured streaming?

Internally, a DStream is a sequence of RDDs. Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing.

Which property must a Spark structured streaming sink possess to ensure end to end exactly once semantics?

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.

When should you use structured streaming?

Structured Streaming allows you to take the same operations that you perform in batch mode using Spark's structured APIs, and run them in a streaming fashion. This can reduce latency and allow for incremental processing.


1 Answers

The problem is in parsed.col. Replacing it with col will fix the issue. I would suggest always using col function instead of Dataset.col.

Dataset.col returns resolved column while col returns unresolved column.

parsed.withWatermark("timestamp", "10 minutes") will create a new Dataset with new columns with the same names. The watermark information is attached the timestamp column in the new Dataset, not parsed.col("timestamp"), so the columns in groupBy don't have watermark.

When you use unresolved columns, Spark will figure out the correct columns for you.

like image 133
zsxwing Avatar answered Oct 27 '22 10:10

zsxwing