The following is simple code to get the word count over a window size of 30 seconds and slide size of 10 seconds.
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming.api._
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(5))
// read from text file
val lines0 = ssc.textFileStream("test")
val words0 = lines0.flatMap(_.split(" "))
// read from socket
val lines1 = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words1 = lines1.flatMap(_.split(" "))
val words = words0.union(words1)
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
wordCounts.print()
ssc.checkpoint(".")
ssc.start()
ssc.awaitTermination()
However, I am getting error from this line:
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
. Especially, from _ + _
. The error is
51: error: missing parameter type for expanded function ((x$2, x$3) => x$2.$plus(x$3))
Could anybody tell me what the problem is? Thanks!
Spark Streaming: Window The simplest windowing function is a window, which lets you create a new DStream, computed by applying the windowing parameters to the old DStream. You can use any of the DStream operations on the new stream, so you get all the flexibility you want.
Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.
A checkpoint helps build fault-tolerant and resilient Spark applications. Spark Structured Streaming maintains an intermediate state on HDFS compatible file systems to recover from failures. To specify the checkpoint in a streaming query, we use the checkpointLocation parameter.
This is extremely easy to fix, just be explicit about the types.val wordCounts = words.map((_, 1)).reduceByKeyAndWindow((a:Int,b:Int)=>a+b, Seconds(30), Seconds(10))
The reason scala can't infer the type in this case is explained in this answer
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With