Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark - scala - save dataframe to a table with overwrite mode

I would like to know what exactly "overwrite" does here. Let's say I have a table having the following records in table "tb1"(sorry for bad representation of tables)

driver vin make model

martin abc ford escape
john abd toyota camry
amy abe chevrolet malibu
carlos abf honda civic

Now I have the following dataframe(mydf) with the same columns but with the follwing rows/data

martin abf toyota corolla
carlos abg nissan versa

After saving the above dataframe to the "tb1" with overwrite mode, will the dataframe entirely delete the contents of "tb1" and write the data of mydf(above two records)?

However, I would like the overwrite mode to overwrite only those rows that have same values for column "driver". In this case, of 4 records in "tb1", mydf would overwrite only above 2 records and the resultant table would be as follows-

driver vin make model

martin abf toyota corolla
john abd toyota camry
amy abe chevrolet malibu
carlos abg nissan versa

Can I achieve this functionality using overwrite mode?

mydf.write.mode(SaveMode.Overwrite).saveAsTable("tb1")
like image 812
Uday Sagar Avatar asked Sep 28 '17 16:09

Uday Sagar


People also ask

What is Overwrite mode in Spark?

Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.

What are the different save modes in Spark?

DataFrame can either be loaded and saved. And Spark SQL provides, as for a lot other points, different strategies to deal with data persistence.


2 Answers

What you meant is merge 2 dataframes on the primary key. You want to merge two dataframe and replace the old rows with the new rows and append the extra rows if any present.

This can't be achieved by SaveMode.Overwrite or SaveMode.append.

To do this you need to implement merge functionality of 2 dataframe on the primary key.

Something like this

 parentDF = // actual dataframe
 deltaDF = // new delta to be merged


 val updateDF = spark.sql("select parentDF.* from parentDF join deltaDF on parentDF.id = deltaDF.id")
 val totalDF = parentDF.except(updateDF).union(deltaDF)
 totalDF.write.mode(SaveMode.Overwrite).saveAsTable("tb1")
like image 75
Avishek Bhattacharya Avatar answered Sep 23 '22 09:09

Avishek Bhattacharya


Answering your question:

Can I achieve this functionality using overwrite mode?

No, you can't.

What function Overwrite does is practically, delete all the table that you want to populate and create it again but now with the new DataFrame that you are telling it.

To get the result you want, you would do the following:

  • Save the information of your table to "update" into a new DataFrame:

    val dfTable = hiveContext.read.table("table_tb1")

  • Do a Left Join between your DF of the table to update (dfTable), and the DF (mydf) with your new information, crossing by your "PK", that in your case, will be the driver column.

In the same sentence, you filter the records where mydf("driver") column is null, that are the ones that are not matching and there is no update for these ones.

val newDf = dfTable.join(mydf, dfTable("driver") === mydf("driver"), "leftouter" ).filter(mydf("driver").isNull)
  • After that, Truncate your table tb1 and insert both DataFrames: the newDF and mydf DataFrames:

|

dfArchivo.write.mode(SaveMode.Append).insertInto("table_tb1")  /** Info with no changes */
mydf.write.mode(SaveMode.Append).insertInto("table_tb1") /** Info updated */

In that way, you can get the result you are looking for.

Regards.

like image 37
Erik Barajas Avatar answered Sep 23 '22 09:09

Erik Barajas