Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark delta overwrite a specific partition

So I have a dataframe which has a column, file_date. For a given run, the dataframe has only data for one unique file_date. For instance, in a run, let us assume that there are say about 100 records with a file_date of 2020_01_21.

I am writing this data using the following

(df
 .repartition(1)
 .write
 .format("delta")
 .partitionBy("FILE_DATE")
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .option("replaceWhere","FILE_DATE=" + run_for_file_date)
 .mode("overwrite")
 .save("/mnt/starsdetails/starsGrantedDetails/"))

My requirement is to create a folder/partition for every FILE_DATE as there is a good chance that data for a specific file_date will be rerun and the specific file_date’s data has to be overwritten. Unfortunately in the above code, if I don’t place the “replaceWhere” option, it just overwrites data for other partitions too but if I write the above, data seems to be overwriting correctly the specific partition but every time the write is done, I am getting the following error.

Please note I have also set the following spark config before the write:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

But I am still getting the following error:

AnalysisException: "Data written out does not match replaceWhere 'FILE_DATE=2020-01-19'.\nInvalid data would be written to partitions FILE_DATE=2020-01-20.;"

Can you kindly help please.

like image 842
SWDeveloper Avatar asked Dec 13 '22 09:12

SWDeveloper


1 Answers

There are couple of things that need to be in mind while using replaceWhereto overwrite delta partition. Your dataframe must be filtered before writing into partitions for example we have dataframe DF:

enter image description here

When We write this dataframe into delta table then dataframe partition coulmn range must be filtered which means we should only have partition column values within our replaceWhere condition range.

 DF.write.format("delta").mode("overwrite").option("replaceWhere",  "date >= '2020-12-14' AND date <= '2020-12-15' ").save( "Your location")

if we use condition date < '2020-12-15' instead of date <= '2020-12-15' it will give us error:

enter image description here

Other thing is partition column value needed in quotation '2020-12-15' otherwise chances are it will give error.

There is also pull request open for delta overwrite partitionspark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") here https://github.com/delta-io/delta/pull/371 not sure if they are planning to introduce it.

like image 77
Ali Hasan Avatar answered Feb 04 '23 08:02

Ali Hasan