Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Overwrite only some partitions in a partitioned spark Dataset

Tags:

How can we overwrite a partitioned dataset, but only the partitions we are going to change? For example, recomputing last week daily job, and only overwriting last week of data.

Default Spark behaviour is to overwrite the whole table, even if only some partitions are going to be written.

like image 373
Madhava Carrillo Avatar asked Apr 24 '18 16:04

Madhava Carrillo


People also ask

How do I change the number of partitions in a Spark data frame?

If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.

How does overwrite works in Spark?

Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.


2 Answers

Since Spark 2.3.0 this is an option when overwriting a table. To overwrite it, you need to set the new spark.sql.sources.partitionOverwriteMode setting to dynamic, the dataset needs to be partitioned, and the write mode overwrite. Example in scala:

spark.conf.set(   "spark.sql.sources.partitionOverwriteMode", "dynamic" ) data.write.mode("overwrite").insertInto("partitioned_table") 

I recommend doing a repartition based on your partition column before writing, so you won't end up with 400 files per folder.

Before Spark 2.3.0, the best solution would be to launch SQL statements to delete those partitions and then write them with mode append.

like image 103
Madhava Carrillo Avatar answered Oct 07 '22 18:10

Madhava Carrillo


Just FYI, for PySpark users make sure to set overwrite=True in the insertInto otherwise the mode would be changed to append

from the source code:

def insertInto(self, tableName, overwrite=False):     self._jwrite.mode(         "overwrite" if overwrite else "append"     ).insertInto(tableName) 

this how to use it:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","DYNAMIC") data.write.insertInto("partitioned_table", overwrite=True) 

or in the SQL version works fine.

INSERT OVERWRITE TABLE [db_name.]table_name [PARTITION part_spec] select_statement 

for doc look at here

like image 30
Ali Bey Avatar answered Oct 07 '22 18:10

Ali Bey