Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there any action in RDD keeps the order?

I want an action in RDD performance like reduce but don't need the operator be commutative. i.e. I want the result in follow will always be "123456789".

scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> val result = rdd.someAction{ _+_ }

Firstly, I found the fold. The doc of RDD#fold says:

def fold(zeroValue: T)(op: (T, T) ⇒ T): T Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value"

Note that there is no commutative needed in the doc. However, the result is not as expected:

scala> rdd.fold(""){ _+_ }
res10: String = 312456879

EDIT I have tried as mentioned by @dk14, with no luck:

scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res22: String = 341276895

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res23: String = 914856273

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res24: String = 742539618

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res25: String = 271468359
like image 922
Eastsun Avatar asked Aug 05 '16 04:08

Eastsun


1 Answers

There is no built-in reducing action that satisfies this criteria in Scala, but you can easily implement your own by combining mapPartitions, collect and local reductions:

import scala.reflect.ClassTag

def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = {
  rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f)
}

Using combination of collect and reduce for merge instead of asynchronous and unordered method used by fold ensures that the global order is preserved.

This of course comes with some additional cost including:

  • slightly higher memory footprint on the driver.
  • significantly higher latency - we explicitly wait for all tasks to finish before we start local reduction.
like image 125
zero323 Avatar answered Sep 21 '22 14:09

zero323