I need to update the dataframes's row number column for the delta data. I have implemented the base load's row number as below:
Input Data:
val base = List(List("001", "a", "abc"), List("001", "a", "123"),List("003", "c", "456") ,List("002", "b", "dfr"), List("003", "c", "ytr"))
.map(row => (row(0), row(1), row(2)))
val DS1 = base.toDF("KEY1", "KEY2" ,"VAL")
DS1.show()
+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001| a|abc|
| 001| a|123|
| 003| c|456|
| 002| b|dfr|
| 003| c|ytr|
+----+----+---+
Now I have added the row number using a window function as below :
val baseDF = DS1.select(col("KEY1"), col("KEY2"), col("VAL") ,row_number().over(Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("KEY1"), col("KEY2").asc)).alias("Row_Num"))
baseDF.show()
+----+----+---+-------+
|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a |abc|1 |
|001 |a |123|2 |
|002 |b |dfr|1 |
|003 |c |456|1 |
|003 |c |ytr|2 |
+----+----+---+-------+
Now the delta load comes a below :
val delta = List(List("001", "a", "y45") ,List("002", "b", "444"))
.map(row => (row(0), row(1), row(2)))
val DS2 = delta.toDF("KEY1", "KEY2" ,"VAL")
DS2.show()
+----+----+---+
|KEY1|KEY2|VAL|
+----+----+---+
| 001| a|y45|
| 002| b|444|
+----+----+---+
So the expected updated result should be:
baseDF.show()
|KEY1|KEY2|VAL|Row_Num|
+----+----+---+-------+
|001 |a |abc|1 |
|001 |a |123|2 |
| 001| a|y45|3 | -----> Delta record
|002 |b |dfr|1 |
| 002| b|444|2 | -----> Delta record
|003 |c |456|1 |
|003 |c |ytr|2 |
+----+----+---+-------+
Any suggestions to implement this solution using dataframes/datasets?
Can we achieve the above solution with spark rdd's zipWithIndex
?
One way to add the delta with updated row numbers is to: 1) add column Row_Num
with a large number in DS2
, 2) union baseDF
with it, and 3) calculate the new row numbers, as shown below:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val combinedDF = baseDF.union(
DS2.withColumn("Row_Num", lit(Long.MaxValue))
)
val resultDF = combinedDF.select(
col("KEY1"), col("KEY2"), col("VAL"), row_number().over(
Window.partitionBy(col("KEY1"), col("KEY2")).orderBy(col("Row_Num"))
).alias("New_Row_Num")
)
resultDF.show
+----+----+---+-----------+
|KEY1|KEY2|VAL|New_Row_Num|
+----+----+---+-----------+
| 003| c|456| 1|
| 003| c|ytr| 2|
| 002| b|dfr| 1|
| 002| b|444| 2|
| 001| a|abc| 1|
| 001| a|123| 2|
| 001| a|y45| 3|
+----+----+---+-----------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With