Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does streaming query with update output mode print out all rows?

Tags:

I have three text files in my directory:

a.txt

A B
C D
A E
F

b.txt

A B
C D
A E

c.txt

A B
C D
A E
G

I use the following streaming query:

val schema = new StructType().add("value", "string")
val lines = spark
  .readStream
  .schema(schema)
  .option("maxFilesPerTrigger", 1)
  .text(...)
  .as[String]

val wordCounts = lines.flatMap(_.split("\\s+")).groupBy("value").count()

val query = wordCounts.writeStream
  .queryName("t")
  .outputMode("update") // <-- output mode: update
  .format("memory")
  .start()

while (true) {
  spark.sql("select * from t").show(truncate = false)
  println(new Date())
  Thread.sleep(1000)
}

The query always outputs the following results:

+-----+-----+
|value|count|
+-----+-----+
|A    |2    |
|B    |1    |
|C    |1    |
|D    |1    |
|E    |1    |
|A    |4    |
|B    |2    |
|C    |2    |
|D    |2    |
|E    |2    |
|G    |1    |
|A    |6    |
|B    |3    |
|C    |3    |
|D    |3    |
|E    |3    |
|F    |1    |
+-----+-----+

It looks like each file's result is appended to the output result (as in Append output mode) and I'm not sure I understand what update mode means. How does update output mode work?

like image 660
Tom Avatar asked Jul 27 '18 07:07

Tom


1 Answers

In Append mode, only the new rows added to the Result Table since the last trigger will be outputted to the sink. This is supported for only those queries where rows added to the Result Table is never going to change. Hence, this mode guarantees that each row will be output only once.

In Update mode, Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink.

To better understand the modes, I have changed the output format to the console and modified the data, executed in update mode, below are the results:

a.txt
A B
C D
A E
F X
Y Z

b.txt
A B
C D
A E

c.txt
A B
C D
A E
G


scala> val query = wordCounts.writeStream.queryName("t").outputMode("update").format("console").start()
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1985f8e3

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    F|    1|
|    E|    1|
|    B|    1|
|    Y|    1|
|    D|    1|
|    C|    1|
|    Z|    1|
|    A|    2|
|    X|    1|
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    E|    2|
|    B|    2|
|    D|    2|
|    C|    2|
|    A|    4|
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    E|    3|
|    B|    3|
|    D|    3|
|    C|    3|
|    A|    6|
|    G|    1|
+-----+-----+

You can see that only the rows that got updated since last trigger are displayed in console for every batch. ( for ex: X, Y, Z counts are not displayed in Batch 1 & 2, since they are not updated).

In your case, as you are writing the data to Memory. As you are not evicting the memory for every batch, the previous batch data also getting retrieved when you query it. Hope the modes are clear now.

like image 170
Lakshman Battini Avatar answered Sep 28 '22 18:09

Lakshman Battini