Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark How to get number of Keys changed in two JSONS in Scala?

I have two dataframes for which I am trying to find the difference. The 2 dataframes contains arrays of struct. I do not require 1 key in that struct. So I first removed it and then converted to JSON string. When comparing, I need to know how many elements changed in that array (Json). Is there a way to do that in spark?

Both base_data_set and target_data_set contains ID and KEY. KEY is a array<Struct> :

root
 |-- id: string (nullable = true)
 |-- result: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: integer (nullable = true)
 |    |    |-- key3: string (nullable = false)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key4: string (nullable = true)

val temp_base = base_data_set
    .withColumn("base_result", explode(base_data_set(RESULT)))
    .withColumn("base",
        struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
    .groupBy(ID)
    .agg(to_json(collect_list("base")).as("base_picks"))

val temp_target = target_data_set
    .withColumn("target_result", explode(target_data_set(RESULT)))
    .withColumn("target",
        struct($"target_result.key1", $"target_result.key2", $"target_result.key3"))
    .groupBy(ID)
    .agg(to_json(collect_list("target")).as("target_picks"))    


val common_keys = temp_base
    .join(temp_target, temp_base(ID) ===  temp_target(ID))
    .drop(temp_target(ID))
    .withColumn("isModified", $"base_picks" =!= $"target_picks") 

It returns false even 1 item changes but I need to return false only when more than n (say n =3) elements (in the array) changed. Can some one please advice me on how do I achieve this ?

like image 435
user3407267 Avatar asked May 15 '19 08:05

user3407267


1 Answers

I'm not quite sure if this is what you mean because some parts of your question are not easy to understand (at least for me).

I used two json files to simulate your schema. They look like this:

base_data_set:
{ "id": 1,  "result": [ {"key1":  23, "key2": "qwerty", "key3":  "abc"}, {"key1":  24, "key2": "asdf", "key3":  "abc"},  {"key1":  25, "key2": "xcv", "key3":  "abc"}]}
{ "id": 2,  "result": [ {"key1":  23, "key2": "qwerty", "key3":  "abc"}, {"key1":  24, "key2": "asdf", "key3":  "abc"},  {"key1":  25, "key2": "xcv", "key3":  "abc"}]}
{ "id": 3,  "result": [ {"key1":  "1", "key2": "2", "key3":  "3"}, {"key1":  "4", "key2": "5", "key3":  "6"},  {"key1":  "7", "key2": "8", "key3":  "9"}]}
{ "id": 4,  "result": [ {"key1":  "4", "key2": "5", "key3":  "6"}, {"key1":  "1", "key2": "2", "key3":  "3"},   {"key1":  "7", "key2": "8", "key3":  "9"}]}

target_data_set:
{ "id": 1,  "result": [ {"key1":  24, "key2": "qwerty", "key3":  "abc"}, {"key1":  24, "key2": "asdf", "key3":  "abc"},  {"key1":  25, "key2": "xcv", "key3":  "abc"}]}
{ "id": 2,  "result": [ {"key1":  23, "key2": "qwertu", "key3":  "abc"}, {"key1":  24, "key2": "asdfg", "key3":  "abc"},  {"key1":  25, "key2": "xcvv", "key3":  "abc"}]}
{ "id": 3,  "result": [ {"key1":  "1", "key2": "2", "key3":  "3"}, {"key1":  "4", "key2": "5", "key3":  "6"},  {"key1":  "7", "key2": "8", "key3":  "9"}]}
{ "id": 4,  "result": [ {"key1":  "1", "key2": "2", "key3":  "3"}, {"key1":  "4", "key2": "5", "key3":  "6"},  {"key1":  "7", "key2": "8", "key3":  "9"}]}

As you can see, the first line only differs in one of the structs ind the result array, while in the second line all structs are different. Rows 3 and 4 show a case where it is not clear to me if you would consider this to be a change. The structs are the same between both tables, however their ordering changes in row 4.

Starting with your initial transformations, I removed the to_json function because it converts the structured elements to a string which makes comparision harder:

val temp_base = base_data_set
  .withColumn("base_result", explode(base_data_set("result")))
  .withColumn("base",
    struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
  .groupBy("id")
  .agg(collect_list("base").as("base_picks"))


val temp_target = target_data_set
  .withColumn("target_result", explode(target_data_set(RESULT)))
  .withColumn("target",
    struct($"target_result.key1", $"target_result.key2", $"target_result.key3"))
  .groupBy(ID)
  .agg(collect_list("target").as("target_picks"))


val common_keys = temp_base
  .join(temp_target, temp_base(ID) ===  temp_target(ID))
  .drop(temp_target(ID))
  .withColumn("isModified", $"base_picks" =!= $"target_picks")

Aftwerwards you can use an user defined function to compare the results of collect_list. It takes the content of two columns and counts how many elements are different:

  val numChangedStruct = udf {
  (left: mutable.WrappedArray[Object], right: mutable.WrappedArray[Object]) =>
    left.zip(right).count(x => !x._1.equals(x._2))
}

And applied:

common_keys.withColumn("numChangedStruct", numChangedStruct($"base_picks", $"target_picks")).show(20, false)

+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|id |base_picks                                    |target_picks                                    |isModified|numChangedStruct|
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|1  |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[24,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]  |true      |1               |
|3  |[[1,2,3], [4,5,6], [7,8,9]]                   |[[1,2,3], [4,5,6], [7,8,9]]                     |false     |0               |
|2  |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[23,qwertu,abc], [24,asdfg,abc], [25,xcvv,abc]]|true      |3               |
|4  |[[4,5,6], [1,2,3], [7,8,9]]                   |[[1,2,3], [4,5,6], [7,8,9]]                     |true      |2               |
+---+----------------------------------------------+------------------------------------------------+----------+----------------+

However, this solution depends on the ordering of the elements in "result", as you can see from rows with id 3 and 4.

like image 105
moe Avatar answered Oct 12 '22 13:10

moe