This is a minor issue with one of my spark jobs which doesn't seem to cause any issues -- yet annoys me every time I see it and fail to come up with a better solution.
Say I have a Scala collection like this:
val myStuff = List(Try(2/2), Try(2/0))
I can partition this list into successes and failures with partition:
val (successes, failures) = myStuff.partition(_.isSuccess)
Which is nice. The implementation of partition only traverses the source collection once to build the two new collections. However, using Spark, the best equivalent I have been able to devise is this:
val myStuff: RDD[Try[???]] = sourceRDD.map(someOperationThatMayFail)
val successes: RDD[???] = myStuff.collect { case Success(v) => v }
val failures: RDD[Throwable] = myStuff.collect { case Failure(ex) => ex }
Which aside from the difference of unpacking the Try (which is fine) also requires traversing the data twice. Which is annoying.
Is there any better Spark alternative that can split an RDD without multiple traversals? i.e. having a signature something like this where partition has the behaviour of Scala collections partition rather than RDD partition:
val (successes: RDD[Try[???]], failures: RDD[Try[???]]) = myStuff.partition(_.isSuccess)
For reference, I previously used something like the below to solve this. The potentially failing operation is de-serializing some data from a binary format, and the failures have become interesting enough that they need to be processed and saved as an RDD rather than something logged.
def someOperationThatMayFail(data: Array[Byte]): Option[MyDataType] = {
try {
Some(deserialize(data))
} catch {
case e: MyDesrializationError => {
logger.error(e)
None
}
}
}
Apache Spark's Resilient Distributed Datasets (RDD) are a collection of various data that are so big in size, that they cannot fit into a single node and should be partitioned across various nodes. Apache Spark automatically partitions RDDs and distributes the partitions across different nodes.
By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
There might be other solutions, but here you go:
Setup:
import scala.util._
val myStuff = List(Try(2/2), Try(2/0))
val myStuffInSpark = sc.parallelize(myStuff)
Execution:
val myStuffInSparkPartitioned = myStuffInSpark.aggregate((List[Try[Int]](),List[Try[Int]]()))(
(accum, curr)=>if(curr.isSuccess) (curr :: accum._1,accum._2) else (accum._1, curr :: accum._2),
(first, second)=> (first._1 ++ second._1,first._2 ++ second._2))
Let me know if you need an explanation
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With