This is a spark streaming program written in scala. It counts the number of words from a socket in every 1 second. The result would be the word count, for example, the word count from time 0 to 1, and the word count then from time 1 to 2. But I wonder if there is some way we could alter this program so that we could get accumulated word count? That is, the word count from time 0 up till now.
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
Streaming – Complete Output Mode This mode is used only when you have streaming aggregated data. One example would be counting the words on streaming data and aggregating with previous data and output the results to sink.
sliding interval - is amount of time in seconds for how much the window will shift. For example in previous example sliding interval is 1 (since calculation is kicked out each second) e.g. at time=1, time=2, time=3... if you set sliding interval=2, you will get calculation at time=1, time=3, time=5...
Apache Spark Structured Streaming is a near-real time processing engine that offers end-to-end fault tolerance with exactly-once processing guarantees using familiar Spark APIs.
You can use a StateDStream
for this. There is an example of stateful word count from sparks examples.
object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
The way it works is you get an Seq[T]
for each batch, then you update an Option[T]
which acts like an accumulator. The reason it is an Option
is because on the first batch it will be None
and stay that way unless it's updated. In this example the count is an int, if you are dealing with a lot of data you may want to even have a Long
or BigInt
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With