So I'm running into an issue where a filter I'm using on an RDD can potentially create an empty RDD. I feel that doing a count() in order to test for emptiness would be very expensive, and was wondering if there is a more performant way to handle this situation.
Here is an example of what this issue might look like:
val b:RDD[String] = sc.parallelize(Seq("a","ab","abc"))
println(b.filter(a => !a.contains("a")).reduce(_+_))
would give the result
empty collection
java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1005)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1005)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1005)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
Does anyone have any suggestions for how I should go about addressing this edge case?
Map and Reduce function both take input as array. map cannot return one single element for an array of multiple elements, while reduce will always return the accumulator you eventually changed.
cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.
fold(): This RDD action function aggregates elements of each partition and then results for all the partitions. reduce(): This RDD action function reduces elements of the dataset using the specified binary operator.
The empty rdd has no partitions. Using this logic, we check for partition array of RDD. If partition array is empty, then its an EmptyRDD. This way we can avoid saving empty batches to HDFS.
Consider .fold("")(_ + _)
instead of .reduce(_ + _)
how about
scala> val b = sc.parallelize(Seq("a","ab","abc"))
b: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> b.isEmpty
res1: Boolean = false
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With