So I have a dataframe which has a column, file_date. For a given run, the dataframe has only data for one unique file_date. For instance, in a run, let us assume that there are say about 100 records with a file_date of 2020_01_21.
I am writing this data using the following
(df
.repartition(1)
.write
.format("delta")
.partitionBy("FILE_DATE")
.mode("overwrite")
.option("overwriteSchema", "true")
.option("replaceWhere","FILE_DATE=" + run_for_file_date)
.mode("overwrite")
.save("/mnt/starsdetails/starsGrantedDetails/"))
My requirement is to create a folder/partition for every FILE_DATE as there is a good chance that data for a specific file_date will be rerun and the specific file_date’s data has to be overwritten. Unfortunately in the above code, if I don’t place the “replaceWhere” option, it just overwrites data for other partitions too but if I write the above, data seems to be overwriting correctly the specific partition but every time the write is done, I am getting the following error.
Please note I have also set the following spark config before the write:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
But I am still getting the following error:
AnalysisException: "Data written out does not match replaceWhere 'FILE_DATE=2020-01-19'.\nInvalid data would be written to partitions FILE_DATE=2020-01-20.;"
Can you kindly help please.
There are couple of things that need to be in mind while using replaceWhere
to overwrite delta partition.
Your dataframe must be filtered before writing into partitions for example we have dataframe DF:
When We write this dataframe into delta table then dataframe partition coulmn range must be filtered which means we should only have partition column values within our replaceWhere condition range.
DF.write.format("delta").mode("overwrite").option("replaceWhere", "date >= '2020-12-14' AND date <= '2020-12-15' ").save( "Your location")
if we use condition date < '2020-12-15' instead of date <= '2020-12-15' it will give us error:
Other thing is partition column value needed in quotation '2020-12-15' otherwise chances are it will give error.
There is also pull request open for delta overwrite partitionspark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
here https://github.com/delta-io/delta/pull/371 not sure if they are planning to introduce it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With