Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reducing potentially empty RDD's

So I'm running into an issue where a filter I'm using on an RDD can potentially create an empty RDD. I feel that doing a count() in order to test for emptiness would be very expensive, and was wondering if there is a more performant way to handle this situation.

Here is an example of what this issue might look like:

    val b:RDD[String] = sc.parallelize(Seq("a","ab","abc"))


    println(b.filter(a => !a.contains("a")).reduce(_+_))

would give the result

empty collection
java.lang.UnsupportedOperationException: empty collection
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1005)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1005)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1005)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)

Does anyone have any suggestions for how I should go about addressing this edge case?

like image 755
Daniel Imberman Avatar asked Dec 10 '15 20:12

Daniel Imberman


People also ask

What is the difference between reducing () and Take () function?

Map and Reduce function both take input as array. map cannot return one single element for an array of multiple elements, while reduce will always return the accumulator you eventually changed.

How many RDDs can Cogroup () can work at once?

cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.

Which of the following is an example of RDD action?

fold(): This RDD action function aggregates elements of each partition and then results for all the partitions. reduce(): This RDD action function reduces elements of the dataset using the specified binary operator.

What is the use of empty RDD in Spark?

The empty rdd has no partitions. Using this logic, we check for partition array of RDD. If partition array is empty, then its an EmptyRDD. This way we can avoid saving empty batches to HDFS.


2 Answers

Consider .fold("")(_ + _) instead of .reduce(_ + _)

like image 162
Dima Avatar answered Sep 21 '22 17:09

Dima


how about

scala> val b = sc.parallelize(Seq("a","ab","abc"))
b: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at     parallelize at <console>:24

scala> b.isEmpty
res1: Boolean = false
like image 45
Roberto Congiu Avatar answered Sep 20 '22 17:09

Roberto Congiu