I am trying to aggregate stream with two different windows and printing it into the console. However only the first streaming query is being printed. The tenSecsQ
is not printed into the console.
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCountWindowed")
.config("spark.master", "local[*]")
.getOrCreate();
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load();
Dataset<Row> words = lines
.as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
.toDF("word", "timestamp");
// 5 second window
Dataset<Row> fiveSecs = words
.groupBy(
functions.window(words.col("timestamp"), "5 seconds"),
words.col("word")
).count().orderBy("window");
// 10 second window
Dataset<Row> tenSecs = words
.groupBy(
functions.window(words.col("timestamp"), "10 seconds"),
words.col("word")
).count().orderBy("window");
Trigger Streaming Query for both 5s and 10s aggregated streams. The output for 10s stream is not printed. Only 5s is printed into console
// Start writeStream() for 5s window
StreamingQuery fiveSecQ = fiveSecs.writeStream()
.queryName("5_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();
// Start writeStream() for 10s window
StreamingQuery tenSecsQ = tenSecs.writeStream()
.queryName("10_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();
tenSecsQ.awaitTermination();
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Note that, if you want to receive multiple streams of data in parallel in your streaming application, you can create multiple input DStreams (discussed further in the Performance Tuning section). This will create multiple receivers which will simultaneously receive multiple data streams.
Spark doesn't have a distinct method that takes columns that should run distinct on however, Spark provides another signature of dropDuplicates() function which takes multiple columns to eliminate duplicates. Note that calling dropDuplicates() on DataFrame returns a new DataFrame with duplicate rows removed.
I've been investigating this question.
Summary: Each query in Structured Streaming consumes the source
data. The socket source creates a new connection for each query defined. The behavior seen in this case is because nc
is only delivering the input data to the first connection.
Henceforth, it's not possible to define multiple aggregations over the socket connection unless we can ensure that the connected socket source delivers the same data to each connection open.
I discussed this question on the Spark mailing list. Databricks developer Shixiong Zhu answered:
Spark creates one connection for each query. The behavior you observed is because how "nc -lk" works. If you use
netstat
to check the tcp connections, you will see there are two connections when starting two queries. However, "nc" forwards the input to only one connection.
I verified this behavior by defining a small experiment:
First, I created a SimpleTCPWordServer
that delivers random words to each connection open and a basic Structured Streaming job that declares two queries. The only difference between them is that the 2nd query defines an extra constant column to differentiate its output:
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.option("includeTimestamp", true)
.load()
val q1 = lines.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
val q2 = lines.withColumn("foo", lit("foo")).writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("7 seconds"))
.start()
If StructuredStreaming would consume only one stream, then we should see the same words delivered by both queries. In the case that each query consumes a separate stream, then we will have different words reported by each query.
This is the observed output:
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-------------------+
| value| timestamp|
+--------+-------------------+
|champion|2017-08-14 13:54:51|
+--------+-------------------+
+------+-------------------+---+
| value| timestamp|foo|
+------+-------------------+---+
|belong|2017-08-14 13:54:51|foo|
+------+-------------------+---+
-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-------------------+---+
| value| timestamp|foo|
+-------+-------------------+---+
| agenda|2017-08-14 13:54:52|foo|
|ceiling|2017-08-14 13:54:52|foo|
| bear|2017-08-14 13:54:53|foo|
+-------+-------------------+---+
-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------------------+
| value| timestamp|
+----------+-------------------+
| breath|2017-08-14 13:54:52|
|anticipate|2017-08-14 13:54:52|
| amazing|2017-08-14 13:54:52|
| bottle|2017-08-14 13:54:53|
| calculate|2017-08-14 13:54:53|
| asset|2017-08-14 13:54:54|
| cell|2017-08-14 13:54:54|
+----------+-------------------+
We can clearly see that the streams for each query are different. It would look like it's not possible to define multiple aggregations over the data delivered by the socket source
unless we can guarantee that the TCP backend server delivers exactly the same data to each open connection.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With