I would like to know what exactly "overwrite" does here. Let's say I have a table having the following records in table "tb1"(sorry for bad representation of tables)
driver vin make model
martin abc ford escape
john abd toyota camry
amy abe chevrolet malibu
carlos abf honda civic
Now I have the following dataframe(mydf) with the same columns but with the follwing rows/data
martin abf toyota corolla
carlos abg nissan versa
After saving the above dataframe to the "tb1" with overwrite mode, will the dataframe entirely delete the contents of "tb1" and write the data of mydf(above two records)?
However, I would like the overwrite mode to overwrite only those rows that have same values for column "driver". In this case, of 4 records in "tb1", mydf would overwrite only above 2 records and the resultant table would be as follows-
driver vin make model
martin abf toyota corolla
john abd toyota camry
amy abe chevrolet malibu
carlos abg nissan versa
Can I achieve this functionality using overwrite mode?
mydf.write.mode(SaveMode.Overwrite).saveAsTable("tb1")
Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
DataFrame can either be loaded and saved. And Spark SQL provides, as for a lot other points, different strategies to deal with data persistence.
What you meant is merge 2 dataframes on the primary key. You want to merge two dataframe and replace the old rows with the new rows and append the extra rows if any present.
This can't be achieved by SaveMode.Overwrite or SaveMode.append.
To do this you need to implement merge functionality of 2 dataframe on the primary key.
Something like this
parentDF = // actual dataframe
deltaDF = // new delta to be merged
val updateDF = spark.sql("select parentDF.* from parentDF join deltaDF on parentDF.id = deltaDF.id")
val totalDF = parentDF.except(updateDF).union(deltaDF)
totalDF.write.mode(SaveMode.Overwrite).saveAsTable("tb1")
Answering your question:
Can I achieve this functionality using overwrite mode?
No, you can't.
What function Overwrite does is practically, delete all the table that you want to populate and create it again but now with the new DataFrame that you are telling it.
To get the result you want, you would do the following:
Save the information of your table to "update" into a new DataFrame:
val dfTable = hiveContext.read.table("table_tb1")
Do a Left Join between your DF of the table to update (dfTable), and the DF (mydf) with your new information, crossing by your "PK", that in your case, will be the driver column.
In the same sentence, you filter the records where mydf("driver") column is null, that are the ones that are not matching and there is no update for these ones.
val newDf = dfTable.join(mydf, dfTable("driver") === mydf("driver"), "leftouter" ).filter(mydf("driver").isNull)
|
dfArchivo.write.mode(SaveMode.Append).insertInto("table_tb1") /** Info with no changes */
mydf.write.mode(SaveMode.Append).insertInto("table_tb1") /** Info updated */
In that way, you can get the result you are looking for.
Regards.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With