I am using spark and java to to try and compare two data frames.
Once I convert my csv files into data frames, I want to highlight exactly what changed between two dataframes.
They all have the same columns in common.
As you can see the only thing not correct with below data frames is emp_id 4 in the second df2.
Dataset<Row> df1 = spark.read().csv("/Users/dataframeOne.csv");
Dataset<Row> df1 = spark.read().csv("/Users/dataframeTwo.csv");
df1.unionAll(df2).except(df1.intersect(df2)).show(true);
Df1
+------+---------+--------+----------+-------+--------+
|emp_id| emp_city|emp_name| emp_phone|emp_sal|emp_site|
+------+---------+--------+----------+-------+--------+
| 3| Chennai| rahman|9848022330| 45000|SanRamon|
| 1|Hyderabad| ram|9848022338| 50000| SF|
| 2|Hyderabad| robin|9848022339| 40000| LA|
| 4| sanjose| romin|9848022331| 45123|SanRamon|
+------+---------+--------+----------+-------+--------+
Df2
+------+---------+--------+----------+-------+--------+
|emp_id| emp_city|emp_name| emp_phone|emp_sal|emp_site|
+------+---------+--------+----------+-------+--------+
| 3| Chennai| rahman|9848022330| 45000|SanRamon|
| 1|Hyderabad| ram|9848022338| 50000| SF|
| 2|Hyderabad| robin|9848022339| 40000| LA|
| 4| sanjose| romino|9848022331| 45123|SanRamon|
+------+---------+--------+----------+-------+--------+
Difference
+------+--------+--------+----------+-------+--------+
|emp_id|emp_city|emp_name| emp_phone|emp_sal|emp_site|
+------+--------+--------+----------+-------+--------+
| 4| sanjose| romino|9848022331| 45123|SanRamon|
+------+--------+--------+----------+-------+--------+
How can I highlight in yellow 'Romino', the incorrect field using JAVA and SPARK?
Highlighting something in Spark depends on your GUI, so as first step I would suggest to detect the different values and add the information about the differences as additional column to the dataframe.
Step 1: Add a suffix to all columns of the two dataframes and join them over the primary key (emp_id):
import static org.apache.spark.sql.functions.*;
private static Dataset<Row> prefix(Dataset<Row> df, String prefix) {
for(String col: df.columns()) df = df.withColumnRenamed(col, col + prefix);
return df;
}
[...]
Dataset<Row> df1 = spark.read().option("header", "true").csv(...);
Dataset<Row> df2 = spark.read().option("header", "true").csv(...);
String[] columns = df1.columns();
Dataset<Row> joined = prefix(df1, "_1").join(prefix(df2, "_2"),
col("emp_id_1").eqNullSafe(col("emp_id_2")), "full_outer");
Step 2: create a list of column objects that check if the value from one table is different from the other table. This list will later be used as input parameter for map.
List<Column> diffs = new ArrayList<>();
for( String column: columns) {
diffs.add(lit(column));
diffs.add(when(col(column + "_1").eqNullSafe(col(column + "_2")), null)
.otherwise(concat_ws("/", col(column + "_1"), col(column + "_2"))));
}
Step 3: create a new column containing a map with all differences:
joined.withColumn("differences", map(diffs.toArray(new Column[]{})))
.withColumn("differences", map_filter(col("differences"), (k, v) -> not(v.isNull())))
.select("emp_id_1", "differences")
.filter(size(col("differences")).gt(0))
.show(false);
Output:
+--------+--------------------------+
|emp_id_1|differences |
+--------+--------------------------+
|4 |{emp_name -> romin/romino}|
+--------+--------------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With