Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

DataFrame equality in Apache Spark

Assume df1 and df2 are two DataFrames in Apache Spark, computed using two different mechanisms, e.g., Spark SQL vs. the Scala/Java/Python API.

Is there an idiomatic way to determine whether the two data frames are equivalent (equal, isomorphic), where equivalence is determined by the data (column names and column values for each row) being identical save for the ordering of rows & columns?

The motivation for the question is that there are often many ways to compute some big data result, each with its own trade-offs. As one explores these trade-offs, it is important to maintain correctness and hence the need to check for the equivalence/equality on a meaningful test data set.

like image 940
Sim Avatar asked Jul 03 '15 02:07

Sim


People also ask

How do you know if two DataFrames are equal in Spark?

Checking the schema first and then you could do an intersection to df3 and verify that the count of df1,df2 & df3 are all equal (however this only works if there aren't duplicate rows, if there are different duplicates rows this method could still return true).

How do you compare two DataFrames the same?

DataFrame - equals() function The equals() function is used to test whether two objects contain the same elements. This function allows two Series or DataFrames to be compared against each other to see if they have the same shape and elements. NaNs in the same location are considered equal.

How do you check equal conditions in PySpark?

eq is the comparison operator used to check if all the values in the given pyspark pandas dataframe are equal to the given value. If yes, then it will return True for that value; otherwise, False is returned. It is also possible to use '==' – equal to operator.


1 Answers

Scala (see below for PySpark)

The spark-fast-tests library has two methods for making DataFrame comparisons (I'm the creator of the library):

The assertSmallDataFrameEquality method collects DataFrames on the driver node and makes the comparison

def assertSmallDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {   if (!actualDF.schema.equals(expectedDF.schema)) {     throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))   }   if (!actualDF.collect().sameElements(expectedDF.collect())) {     throw new DataFrameContentMismatch(contentMismatchMessage(actualDF, expectedDF))   } } 

The assertLargeDataFrameEquality method compares DataFrames spread on multiple machines (the code is basically copied from spark-testing-base)

def assertLargeDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {   if (!actualDF.schema.equals(expectedDF.schema)) {     throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))   }   try {     actualDF.rdd.cache     expectedDF.rdd.cache      val actualCount = actualDF.rdd.count     val expectedCount = expectedDF.rdd.count     if (actualCount != expectedCount) {       throw new DataFrameContentMismatch(countMismatchMessage(actualCount, expectedCount))     }      val expectedIndexValue = zipWithIndex(actualDF.rdd)     val resultIndexValue = zipWithIndex(expectedDF.rdd)      val unequalRDD = expectedIndexValue       .join(resultIndexValue)       .filter {         case (idx, (r1, r2)) =>           !(r1.equals(r2) || RowComparer.areRowsEqual(r1, r2, 0.0))       }      val maxUnequalRowsToShow = 10     assertEmpty(unequalRDD.take(maxUnequalRowsToShow))    } finally {     actualDF.rdd.unpersist()     expectedDF.rdd.unpersist()   } } 

assertSmallDataFrameEquality is faster for small DataFrame comparisons and I've found it sufficient for my test suites.

PySpark

Here's a simple function that returns true if the DataFrames are equal:

def are_dfs_equal(df1, df2):     if df1.schema != df2.schema:         return False     if df1.collect() != df2.collect():         return False     return True 

You'll typically perform DataFrame equality comparisons in a test suite and will want a descriptive error message when the comparisons fail (a True / False return value doesn't help much when debugging).

Use the chispa library to access the assert_df_equality method that returns descriptive error messages for test suite workflows.

like image 144
Powers Avatar answered Oct 06 '22 02:10

Powers