Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Checking for equality of RDDs

I making some tests in JUnit and I need to check the equality of two Spark RDDs.

A way I thought of doing it is this:

JavaRDD<SomeClass> expResult = ...;
JavaRDD<SomeClass> result = ...;

assertEquals(expResult.collect(), result.collect());

Is there a better way than this?

like image 515
Aki K Avatar asked Nov 30 '14 13:11

Aki K


People also ask

What types of operations do we use in the RDDs?

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.

Are RDDs deprecated?

Are they being deprecated? The answer is a resounding NO! What's more, as you will note below, you can seamlessly move between DataFrame or Dataset and RDDs at will—by simple API method calls—and DataFrames and Datasets are built on top of RDDs.


1 Answers

If the expected result is reasonably small, it's best to collect RDD data and compare it locally (just like you've written).

When it's necessary to use large enough datasets in tests, there are few other possibilities:

Disclaimer: I'm not familiar enough with Spark Java API, so I'll write further sample code in Scala. I hope it won't be a problem, since it might be either rewritten in Java or converted into a couple of utility functions invoked from Java code.

Method 1: Zip RDDs together and compare item-by-item

This method is only usable if the order of elements in RDD is well defined (i.e., RDDs are sorted).

val diff = expResult
  .zip(result)
  .collect { case (a, b) if a != b => a -> b }
  .take(100)

The diff array will contain up to 100 differentiating pairs. If the RDDs are big enough, and you'd like to obtain all items from diff locally, it's possible to use toLocalIterator method. It's better not to use collect method, since you may run OOM.

This method is probably the fastest, since it doesn't require shuffle, but it might be only used if the order of partitions in RDDs and the order of items in partitions is well defined.

Method 2: Co-group RDDs

This method might be used to test if the result RDD contains specified (possibly non-unique) values without any particular order

  val diff = expResult.map(_ -> 1)
    .cogroup(result.map(_ -> 1))
    .collect { case (a, (i1, i2)) if i1.sum != i2.sum => a -> (i1.sum - i2.sum) }
    .take(100)

The diff array will contain the differentiating values together with difference between amounts.

For example:

  • if expResult contains single instance of some value and result doesn't contain that value, the number will be +1;
  • If result contains 3 instances of another value, and expResult only 1, the number will be -2.

This method will be faster than other options (i.e., substracting RDDs from each other), since it requires only one shuffle.

like image 119
Wildfire Avatar answered Sep 30 '22 07:09

Wildfire