Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Efficient way to test if an RDD is empty

There is not an isEmpty method on RDD's, so what is the most efficient way of testing if an RDD is empty?

like image 460
Tobber Avatar asked Feb 11 '15 12:02

Tobber


People also ask

How do you check if RDD is empty or not?

The best method is using take(1). length==0 . It should run in O(1) except when the RDD is empty, in which case it is linear in the number of partitions.

How do I know if my Spark DF is empty?

Method 1: isEmpty() The isEmpty function of the DataFrame or Dataset returns true when the DataFrame is empty and false when it's not empty. If the dataframe is empty, invoking “isEmpty” might result in NullPointerException. Note : calling df. head() and df.

How do you check if a DataFrame is not empty in Scala?

Using isEmpty of the DataFrame or Dataset isEmpty function of the DataFrame or Dataset returns true when the dataset empty and false when it's not empty. Alternatively, you can also check for DataFrame empty. Note that calling df.

What is the use of empty RDD in Spark?

Using Spark sc. parallelize() we can create an empty RDD with partitions, writing partitioned RDD to a file results in the creation of multiple part files.


1 Answers

RDD.isEmpty() will be part of Spark 1.3.0.

Based on suggestions in this apache mail-thread and later some comments to this answer, I have done some small local experiments. The best method is using take(1).length==0.

def isEmpty[T](rdd : RDD[T]) = {   rdd.take(1).length == 0  } 

It should run in O(1) except when the RDD is empty, in which case it is linear in the number of partitions.

Thanks to Josh Rosen and Nick Chammas to point me to this.

Note: This fails if the RDD is of type RDD[Nothing] e.g. isEmpty(sc.parallelize(Seq())), but this is likely not a problem in real life. isEmpty(sc.parallelize(Seq[Any]())) works fine.


Edits:

  • Edit 1: Added take(1)==0 method, thanks to comments.

My original suggestion: Use mapPartitions.

def isEmpty[T](rdd : RDD[T]) = {   rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)  } 

It should scale in the number of partitions and is not nearly as clean as take(1). It is however robust to RDD's of type RDD[Nothing].


Experiments:

I used this code for the timings.

def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {   val start = System.currentTimeMillis()   val rdd = sc.parallelize(1L to n, numSlices = 100)   val result = f(rdd)   printf("Time: " + (System.currentTimeMillis() - start) + "   Result: " + result) }  time(1000000000L, rdd => rdd.take(1).length == 0L) time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)) time(1000000000L, rdd => rdd.count() == 0L) time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty) time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)  time(1L, rdd => rdd.take(1).length == 0L) time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)) time(1L, rdd => rdd.count() == 0L) time(1L, rdd => rdd.takeSample(true, 1).isEmpty) time(1L, rdd => rdd.fold(0)(_ + _) == 0L)  time(0L, rdd => rdd.take(1).length == 0L) time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)) time(0L, rdd => rdd.count() == 0L) time(0L, rdd => rdd.takeSample(true, 1).isEmpty) time(0L, rdd => rdd.fold(0)(_ + _) == 0L) 

On my local machine with 3 worker cores I got these results

Time:    21   Result: false Time:    75   Result: false Time:  8664   Result: false Time: 18266   Result: false Time: 23836   Result: false  Time:   113   Result: false Time:   101   Result: false Time:    68   Result: false Time:   221   Result: false Time:    46   Result: false  Time:    79   Result: true Time:    93   Result: true Time:    79   Result: true Time:   100   Result: true Time:    64   Result: true 
like image 146
Tobber Avatar answered Sep 25 '22 13:09

Tobber