I have two dataframes for which I am trying to find the difference. The 2 dataframes contains arrays of struct. I do not require 1 key in that struct. So I first removed it and then converted to JSON string. When comparing, I need to know how many elements changed in that array (Json). Is there a way to do that in spark?
Both base_data_set
and target_data_set
contains ID
and KEY
. KEY
is a array<Struct>
:
root
|-- id: string (nullable = true)
|-- result: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key1: integer (nullable = true)
| | |-- key3: string (nullable = false)
| | |-- key2: string (nullable = true)
| | |-- key4: string (nullable = true)
val temp_base = base_data_set
.withColumn("base_result", explode(base_data_set(RESULT)))
.withColumn("base",
struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
.groupBy(ID)
.agg(to_json(collect_list("base")).as("base_picks"))
val temp_target = target_data_set
.withColumn("target_result", explode(target_data_set(RESULT)))
.withColumn("target",
struct($"target_result.key1", $"target_result.key2", $"target_result.key3"))
.groupBy(ID)
.agg(to_json(collect_list("target")).as("target_picks"))
val common_keys = temp_base
.join(temp_target, temp_base(ID) === temp_target(ID))
.drop(temp_target(ID))
.withColumn("isModified", $"base_picks" =!= $"target_picks")
It returns false even 1 item changes but I need to return false only when more than n (say n =3) elements (in the array) changed. Can some one please advice me on how do I achieve this ?
I'm not quite sure if this is what you mean because some parts of your question are not easy to understand (at least for me).
I used two json files to simulate your schema. They look like this:
base_data_set:
{ "id": 1, "result": [ {"key1": 23, "key2": "qwerty", "key3": "abc"}, {"key1": 24, "key2": "asdf", "key3": "abc"}, {"key1": 25, "key2": "xcv", "key3": "abc"}]}
{ "id": 2, "result": [ {"key1": 23, "key2": "qwerty", "key3": "abc"}, {"key1": 24, "key2": "asdf", "key3": "abc"}, {"key1": 25, "key2": "xcv", "key3": "abc"}]}
{ "id": 3, "result": [ {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "7", "key2": "8", "key3": "9"}]}
{ "id": 4, "result": [ {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "7", "key2": "8", "key3": "9"}]}
target_data_set:
{ "id": 1, "result": [ {"key1": 24, "key2": "qwerty", "key3": "abc"}, {"key1": 24, "key2": "asdf", "key3": "abc"}, {"key1": 25, "key2": "xcv", "key3": "abc"}]}
{ "id": 2, "result": [ {"key1": 23, "key2": "qwertu", "key3": "abc"}, {"key1": 24, "key2": "asdfg", "key3": "abc"}, {"key1": 25, "key2": "xcvv", "key3": "abc"}]}
{ "id": 3, "result": [ {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "7", "key2": "8", "key3": "9"}]}
{ "id": 4, "result": [ {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "7", "key2": "8", "key3": "9"}]}
As you can see, the first line only differs in one of the structs ind the result array, while in the second line all structs are different. Rows 3 and 4 show a case where it is not clear to me if you would consider this to be a change. The structs are the same between both tables, however their ordering changes in row 4.
Starting with your initial transformations, I removed the to_json function because it converts the structured elements to a string which makes comparision harder:
val temp_base = base_data_set
.withColumn("base_result", explode(base_data_set("result")))
.withColumn("base",
struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
.groupBy("id")
.agg(collect_list("base").as("base_picks"))
val temp_target = target_data_set
.withColumn("target_result", explode(target_data_set(RESULT)))
.withColumn("target",
struct($"target_result.key1", $"target_result.key2", $"target_result.key3"))
.groupBy(ID)
.agg(collect_list("target").as("target_picks"))
val common_keys = temp_base
.join(temp_target, temp_base(ID) === temp_target(ID))
.drop(temp_target(ID))
.withColumn("isModified", $"base_picks" =!= $"target_picks")
Aftwerwards you can use an user defined function to compare the results of collect_list
. It takes the content of two columns and counts how many elements are different:
val numChangedStruct = udf {
(left: mutable.WrappedArray[Object], right: mutable.WrappedArray[Object]) =>
left.zip(right).count(x => !x._1.equals(x._2))
}
And applied:
common_keys.withColumn("numChangedStruct", numChangedStruct($"base_picks", $"target_picks")).show(20, false)
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|id |base_picks |target_picks |isModified|numChangedStruct|
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|1 |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[24,qwerty,abc], [24,asdf,abc], [25,xcv,abc]] |true |1 |
|3 |[[1,2,3], [4,5,6], [7,8,9]] |[[1,2,3], [4,5,6], [7,8,9]] |false |0 |
|2 |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[23,qwertu,abc], [24,asdfg,abc], [25,xcvv,abc]]|true |3 |
|4 |[[4,5,6], [1,2,3], [7,8,9]] |[[1,2,3], [4,5,6], [7,8,9]] |true |2 |
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
However, this solution depends on the ordering of the elements in "result", as you can see from rows with id 3 and 4.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With