Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark streaming example calls updateStateByKey with additional parameters

Wondering why the StatefulNetworkWordCount.scala example calls the infamous updateStateByKey() function, which is supposed to take a function only as parameter with instead:

val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
  new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)

Why the need (and how does that get processed - this is not in the signature of updateStateByKey()?) to pass a partitioner, a boolean, and an RDD ?

thanks, Matt

like image 648
matthieu lieber Avatar asked Sep 28 '22 13:09

matthieu lieber


1 Answers

It is because:

  1. You see the different Spark release branch: https://github.com/apache/spark/blob/branch-1.3/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala. In Spark 1.2 this code was with just updateStateByKey receiving a single function as a parameter, while in 1.3 they have optimized it
  2. Different versions of updateStateByKey exist in both 1.2 and 1.3. But in 1.2 there is no version with 4 parameters, it was introduced only in 1.3: https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Here is the code:

/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. Note, that this function may generate a different
* tuple with a different key than the input key. Therefore keys may be removed
* or added in this way. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
* DStream
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @param initialRDD initial state value of each key.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
    updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
    partitioner: Partitioner,
    rememberPartitioner: Boolean,
    initialRDD: RDD[(K, S)]
): DStream[(K, S)] = {
    new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
    rememberPartitioner, Some(initialRDD))
}
like image 118
0x0FFF Avatar answered Oct 27 '22 22:10

0x0FFF