Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming Window Operation

The following is simple code to get the word count over a window size of 30 seconds and slide size of 10 seconds.

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming.api._
import org.apache.spark.storage.StorageLevel

val ssc = new StreamingContext(sc, Seconds(5))

// read from text file
val lines0 = ssc.textFileStream("test")
val words0 = lines0.flatMap(_.split(" "))

// read from socket
val lines1 = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words1 = lines1.flatMap(_.split(" "))

val words = words0.union(words1)
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

wordCounts.print()
ssc.checkpoint(".")
ssc.start()
ssc.awaitTermination()

However, I am getting error from this line:

val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

. Especially, from _ + _. The error is

51: error: missing parameter type for expanded function ((x$2, x$3) => x$2.$plus(x$3))

Could anybody tell me what the problem is? Thanks!

like image 963
user2895478 Avatar asked Jul 22 '14 16:07

user2895478


People also ask

What is windowing in spark streaming?

Spark Streaming: Window The simplest windowing function is a window, which lets you create a new DStream, computed by applying the windowing parameters to the old DStream. You can use any of the DStream operations on the new stream, so you get all the flexibility you want.

What method does spark use to perform streaming operations?

Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.

What is the use of saveAsObjectFiles () operation on Dstreams?

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.

How does spark checkpoint streaming work?

A checkpoint helps build fault-tolerant and resilient Spark applications. Spark Structured Streaming maintains an intermediate state on HDFS compatible file systems to recover from failures. To specify the checkpoint in a streaming query, we use the checkpointLocation parameter.


1 Answers

This is extremely easy to fix, just be explicit about the types.
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow((a:Int,b:Int)=>a+b, Seconds(30), Seconds(10))

The reason scala can't infer the type in this case is explained in this answer

like image 77
aaronman Avatar answered Sep 28 '22 00:09

aaronman