Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Append new data to partitioned parquet files

I am writing an ETL process where I will need to read hourly log files, partition the data, and save it. I am using Spark (in Databricks). The log files are CSV so I read them and apply a schema, then perform my transformations.

My problem is, how can I save each hour's data as a parquet format but append to the existing data set? When saving, I need to partition by 4 columns present in the dataframe.

Here is my save line:

data
    .filter(validPartnerIds($"partnerID"))
    .write
    .partitionBy("partnerID","year","month","day")
    .parquet(saveDestination)

The problem is that if the destination folder exists the save throws an error. If the destination doesn't exist then I am not appending my files.

I've tried using .mode("append") but I find that Spark sometimes fails midway through so I end up loosing how much of my data is written and how much I still need to write.

I am using parquet because the partitioning substantially increases my querying in the future. As well, I must write the data as some file format on disk and cannot use a database such as Druid or Cassandra.

Any suggestions for how to partition my dataframe and save the files (either sticking to parquet or another format) is greatly appreciated.

like image 998
Saman Avatar asked Jan 21 '16 22:01

Saman


People also ask

Can I append data to parquet file?

The version of parquetwrite introduced in R2019a does not currently support appending to preexisting Parquet files on disk.

Can we update data in parquet file?

when we need to edit the data, in our data structures (Parquet), that are immutable. You can add partitions to Parquet files, but you can't edit the data in place.

Can Parquet files be partitioned?

An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values.

Can you append to a parquet file Python?

Using append save mode, you can append a dataframe to an existing parquet file. Incase to overwrite use overwrite save mode.


2 Answers

If you need to append the files, you definitely have to use the append mode. I don't know how many partitions you expect it to generate, but I find that if you have many partitions, partitionBy will cause a number of problems (memory- and IO-issues alike).

If you think that your problem is caused by write operations taking too long, I recommend that you try these two things:

1) Use snappy by adding to the configuration:

conf.set("spark.sql.parquet.compression.codec", "snappy")

2) Disable generation of the metadata files in the hadoopConfiguration on the SparkContext like this:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

The metadata-files will be somewhat time consuming to generate (see this blog post), but according to this they are not actually important. Personally, I always disable them and have no issues.

If you generate many partitions (> 500), I'm afraid the best I can do is suggest to you that you look into a solution not using append-mode - I simply never managed to get partitionBy to work with that many partitions.

like image 85
Glennie Helles Sindholt Avatar answered Sep 24 '22 02:09

Glennie Helles Sindholt


If you're using unsorted partitioning your data is going to be split across all of your partitions. That means every task will generate and write data to each of your output files.

Consider repartitioning your data according to your partition columns before writing to have all the data per output file on the same partitions:

data
 .filter(validPartnerIds($"partnerID"))
 .repartition([optional integer,] "partnerID","year","month","day")
 .write
 .partitionBy("partnerID","year","month","day")
 .parquet(saveDestination)

See: DataFrame.repartition

like image 39
MrChrisRodriguez Avatar answered Sep 27 '22 02:09

MrChrisRodriguez