Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Compare a pyspark dataframe to another dataframe

I have 2 data frames to compare both have the same number of columns and the comparison result should have the field that is mismatching and the values along with the ID.

Dataframe one

+-----+---+--------+
| name| id|    City|
+-----+---+--------+
|  Sam|  3| Toronto|
| BALU| 11|     YYY|
|CLAIR|  7|Montreal|
|HELEN| 10|  London|
|HELEN| 16|  Ottawa|
+-----+---+--------+

Dataframe two

+-------------+-----------+-------------+
|Expected_name|Expected_id|Expected_City|
+-------------+-----------+-------------+
|          SAM|          3|      Toronto|
|         BALU|         11|          YYY|
|        CLARE|          7|     Montreal|
|        HELEN|         10|        Londn|
|        HELEN|         15|       Ottawa|
+-------------+-----------+-------------+

Expected Output

+---+------------+--------------+-----+
| ID|Actual_value|Expected_value|Field|
+---+------------+--------------+-----+
|  7|       CLAIR|         CLARE| name|
|  3|         Sam|           SAM| name|
| 10|      London|         Londn| City|
+---+------------+--------------+-----+

Code

Create example data

from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession

sc = SparkContext()
sql_context = SQLContext(sc)

spark = SparkSession.builder.getOrCreate()

spark.sparkContext.setLogLevel("ERROR") # log only on fails

df_Actual = sql_context.createDataFrame(
    [("Sam", 3,'Toronto'), ("BALU", 11,'YYY'), ("CLAIR", 7,'Montreal'), 
     ("HELEN", 10,'London'), ("HELEN", 16,'Ottawa')],
    ["name", "id","City"]
)

df_Expected = sql_context.createDataFrame(
     [("SAM", 3,'Toronto'), ("BALU", 11,'YYY'), ("CLARE", 7,'Montreal'), 
      ("HELEN", 10,'Londn'), ("HELEN", 15,'Ottawa')],
     ["Expected_name", "Expected_id","Expected_City"]
)

Create empty dataframe for Result

field = [
    StructField("ID",StringType(), True),
    StructField("Actual_value", StringType(), True), 
    StructField("Expected_value", StringType(), True),
    StructField("Field", StringType(), True)
]

schema = StructType(field)
Df_Result = sql_context.createDataFrame(sc.emptyRDD(), schema)

Join expected and actual on id's

df_cobined = df_Actual.join(df_Expected, (df_Actual.id == df_Expected.Expected_id))

col_names=df_Actual.schema.names

Loop through each column to find mismatches

for col_name in col_names:

    #Filter for column values not matching
    df_comp= df_cobined.filter(col(col_name)!=col("Expected_"+col_name ))\
        .select(col('id'),col(col_name),col("Expected_"+col_name ))

    #Add not matching column name
    df_comp = df_comp.withColumn("Field", lit(col_name))

    #Add to final result
    Df_Result = Df_Result.union(df_comp)
Df_Result.show()

This code works as expected. However, in the real case, I have more columns and millions of rows to compare. With this code, it takes more time to finish the comparison. Is there a better way to increase the performance and get the same result?

like image 905
Shijo Avatar asked Aug 16 '18 13:08

Shijo


People also ask

Can you compare two DataFrames?

The compare method in pandas shows the differences between two DataFrames. It compares two data frames, row-wise and column-wise, and presents the differences side by side. The compare method can only compare DataFrames of the same shape, with exact dimensions and identical row and column labels.

What is the difference between PySpark DataFrame and pandas DataFrame?

Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas, hence we often required to covert Pandas DataFrame to PySpark (Spark with Python) for better performance. This is one of the major differences between Pandas vs PySpark DataFrame.

What is eqNullSafe?

eqNullSafe (other) Equality test that is safe for null values.


2 Answers

One way to avoid doing the union is the following:

  • Create a list of columns to compare: to_compare
  • Next select the id column and use pyspark.sql.functions.when to compare the columns. For those with a mismatch, build an array of structs with 3 fields: (Actual_value, Expected_value, Field) for each column in to_compare
  • Explode the temp array column and drop the nulls
  • Finally select the id and use col.* to expand the values from the struct into columns.

Code:

StructType to store the mismatched fields.

import pyspark.sql.functions as f

# these are the fields you want to compare
to_compare = [c for c in df_Actual.columns if c != "id"]

df_new = df_cobined.select(
        "id", 
        f.array([
            f.when(
                f.col(c) != f.col("Expected_"+c), 
                f.struct(
                    f.col(c).alias("Actual_value"),
                    f.col("Expected_"+c).alias("Expected_value"),
                    f.lit(c).alias("Field")
                )
            ).alias(c)
            for c in to_compare
        ]).alias("temp")
    )\
    .select("id", f.explode("temp"))\
    .dropna()\
    .select("id", "col.*")
df_new.show()
#+---+------------+--------------+-----+
#| id|Actual_value|Expected_value|Field|
#+---+------------+--------------+-----+
#|  7|       CLAIR|         CLARE| name|
#| 10|      London|         Londn| City|
#|  3|         Sam|           SAM| name|
#+---+------------+--------------+-----+
like image 72
pault Avatar answered Oct 06 '22 20:10

pault


Join only those records where expected id equals actual and there is mismatch in any other column:

df1.join(df2, df1.id=df2.id and (df1.name != df2.name or df1.age != df2.age...))

This means you will do for loop only across mismatched rows, instead of whole dataset.

like image 27
vvg Avatar answered Oct 06 '22 22:10

vvg