I making some tests in JUnit and I need to check the equality of two Spark RDDs.
A way I thought of doing it is this:
JavaRDD<SomeClass> expResult = ...;
JavaRDD<SomeClass> result = ...;
assertEquals(expResult.collect(), result.collect());
Is there a better way than this?
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.
Are they being deprecated? The answer is a resounding NO! What's more, as you will note below, you can seamlessly move between DataFrame or Dataset and RDDs at will—by simple API method calls—and DataFrames and Datasets are built on top of RDDs.
If the expected result is reasonably small, it's best to collect
RDD data and compare it locally (just like you've written).
When it's necessary to use large enough datasets in tests, there are few other possibilities:
Disclaimer: I'm not familiar enough with Spark Java API, so I'll write further sample code in Scala. I hope it won't be a problem, since it might be either rewritten in Java or converted into a couple of utility functions invoked from Java code.
This method is only usable if the order of elements in RDD is well defined (i.e., RDDs are sorted).
val diff = expResult
.zip(result)
.collect { case (a, b) if a != b => a -> b }
.take(100)
The diff
array will contain up to 100 differentiating pairs. If the RDDs are big enough, and you'd like to obtain all items from diff
locally, it's possible to use toLocalIterator
method. It's better not to use collect
method, since you may run OOM.
This method is probably the fastest, since it doesn't require shuffle, but it might be only used if the order of partitions in RDDs and the order of items in partitions is well defined.
This method might be used to test if the result
RDD contains specified (possibly non-unique) values without any particular order
val diff = expResult.map(_ -> 1)
.cogroup(result.map(_ -> 1))
.collect { case (a, (i1, i2)) if i1.sum != i2.sum => a -> (i1.sum - i2.sum) }
.take(100)
The diff
array will contain the differentiating values together with difference between amounts.
For example:
expResult
contains single instance of some value and result
doesn't contain that value, the number will be +1
;result
contains 3 instances of another value, and expResult
only 1, the number will be -2
.This method will be faster than other options (i.e., substracting RDDs from each other), since it requires only one shuffle.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With