Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

UPSERT in parquet Pyspark

I have parquet files in s3 with the following partitions: year / month / date / some_id Using Spark (PySpark), each day I would like to kind of UPSERT the last 14 days - I would like to replace the existing data in s3 (one parquet file for each partition), but not to delete the days that are before 14 days.. I tried two save modes: append - wasn't good because it just adds another file. overwrite - is deleting the past data and data for other partitions.

Is there any way or best practice to overcome that? should I read all the data from s3 in each run, and write it back again? maybe renaming the files so that append will replace the current file in s3?

Thanks a lot!

like image 673
Itai Sevitt Avatar asked Mar 03 '23 21:03

Itai Sevitt


2 Answers

I usually do something similar. In my case I do an ETL and append one day of data to a parquet file:

The key is to work with the data you want to write (in my case the actual date), make sure to partition by the date column and overwrite all data for the current date.

This will preserve all old data. As an example:

(
    sdf
    .write
    .format("parquet")
    .mode("overwrite")
    .partitionBy("date")
    .option("replaceWhere", "2020-01-27")
    .save(uri)
)

Also you could take a look at delta.io which is an extension of the parquet format that gives some interesting features like ACID transactions.

like image 90
villoro Avatar answered Mar 30 '23 08:03

villoro


To my knowledge, S3 doesn't have an update operation. Once an object is added to s3 cannot be modified. (either you have to replace another object or append a file)

Anyway to your concern that you've to read all data, you can specify the timeline you want to read, partition pruning helps in reading only the partitions within the timeline.

like image 45
Ravi Avatar answered Mar 30 '23 06:03

Ravi