Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to partition and write DataFrame in Spark without deleting partitions with no new data?

I am trying to save a DataFrame to HDFS in Parquet format using DataFrameWriter, partitioned by three column values, like this:

dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path) 

As mentioned in this question, partitionBy will delete the full existing hierarchy of partitions at path and replaced them with the partitions in dataFrame. Since new incremental data for a particular day will come in periodically, what I want is to replace only those partitions in the hierarchy that dataFrame has data for, leaving the others untouched.

To do this it appears I need to save each partition individually using its full path, something like this:

singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890") 

However I'm having trouble understanding the best way to organize the data into single-partition DataFrames so that I can write them out using their full path. One idea was something like:

dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ... 

But foreachPartition operates on an Iterator[Row] which is not ideal for writing out to Parquet format.

I also considered using a select...distinct eventdate, hour, processtime to obtain the list of partitions, and then filtering the original data frame by each of those partitions and saving the results to their full partitioned path. But the distinct query plus a filter for each partition doesn't seem very efficient since it would be a lot of filter/write operations.

I'm hoping there's a cleaner way to preserve existing partitions for which dataFrame has no data?

Thanks for reading.

Spark version: 2.1

like image 789
jaywilson Avatar asked Feb 18 '17 16:02

jaywilson


Video Answer


2 Answers

The mode option Append has a catch!

df.write.partitionBy("y","m","d") .mode(SaveMode.Append) .parquet("/data/hive/warehouse/mydbname.db/" + tableName) 

I've tested and saw that this will keep the existing partition files. However, the problem this time is the following: If you run the same code twice (with the same data), then it will create new parquet files instead of replacing the existing ones for the same data (Spark 1.6). So, instead of using Append, we can still solve this problem with Overwrite. Instead of overwriting at the table level, we should overwrite at the partition level.

df.write.mode(SaveMode.Overwrite) .parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day) 

See the following link for more information:

Overwrite specific partitions in spark dataframe write method

(I've updated my reply after suriyanto's comment. Thnx.)

like image 119
newwebdev Avatar answered Sep 23 '22 07:09

newwebdev


This is an old topic, but I was having the same problem and found another solution, just set your partition overwrite mode to dynamic by using:

spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic') 

So, my spark session is configured like this:

spark = SparkSession.builder.appName('AppName').getOrCreate() spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic') 
like image 23
rodrigombs Avatar answered Sep 25 '22 07:09

rodrigombs