Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

partitionBy & overwrite strategy in an Azure DataLake using PySpark in Databricks

I have a simple ETL process in an Azure environment

blob storage > datafactory > datalake raw > databricks > datalake curated > datwarehouse(main ETL).

the datasets for this project are not very big (~1 million rows 20 columns give or take) however I would like to keep them partitioned properly in my datalake as Parquet files.

currently I run some simple logic to figure where in my lake each file should sit based off business calendars.

the files vaguely looks like this

Year Week Data
2019 01   XXX
2019 02   XXX

I then partition a given file into the following format replacing data that exists and creating new folders for new data.

curated ---
           dataset --
                     Year 2019 
                              - Week 01 - file.pq + metadata
                              - Week 02 - file.pq + metadata
                              - Week 03 - file.pq + datadata #(pre existing file)

the metadata are success and commits that are auto generated.

to this end i use the following query in Pyspark 2.4.3

pyspark_dataframe.write.mode('overwrite')\
                         .partitionBy('Year','Week').parquet('\curated\dataset')

now if I use this command on it's own, it will overwrite any existing data in the target partition

so Week 03 will be lost.

using spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") seems to stop the issue and only over write the target files but I wonder if this is the best way to handle files in my data lake?

also I've found it hard to find any documentation on the above feature.

my first instinct was to loop over a single parquet and write each partition manually, which although gives me greater control, but looping will be slow.

my next thought would be to write each partition to a /tmp folder and move each parquet file and then replace files / create files as need be using the query from above. then purge the /tmp folder whilst creating some sort of metadata log.

Is there a better way/method to this?

any guidance would be much appreciated.

the end goal here is to have a clean and safe area for all 'Curated' data whilst having a log of parquet files I can read into a DataWarehouse for further ETL.

like image 703
Umar.H Avatar asked Mar 03 '20 17:03

Umar.H


1 Answers

I saw that you are using databricks in the azure stack. I think the most viable and recommended method for you to use would be to make use of the new delta lake project in databricks:

It provides options for various upserts, merges and acid transactions to object stores like s3 or azure data lake storage. It basically provides the management, safety, isolation and upserts/merges provided by data warehouses to datalakes. For one pipeline apple actually replaced its data warehouses to be run solely on delta databricks because of its functionality and flexibility. For your use case and many others who use parquet, it is just a simple change of replacing 'parquet' with 'delta', in order to use its functionality (if you have databricks). Delta is basically a natural evolution of parquet and databricks has done a great job by providing added functionality and as well as open sourcing it.

For your case, I would suggest you try the replaceWhere option provided in delta. Before making this targeted update, the target table has to be of format delta

Instead of this:

dataset.repartition(1).write.mode('overwrite')\
                         .partitionBy('Year','Week').parquet('\curataed\dataset')

From https://docs.databricks.com/delta/delta-batch.html:

'You can selectively overwrite only the data that matches predicates over partition columns'

You could try this:

dataset.write.repartition(1)\
       .format("delta")\
       .mode("overwrite")\
       .partitionBy('Year','Week')\
       .option("replaceWhere", "Year == '2019' AND Week >='01' AND Week <='02'")\ #to avoid overwriting Week3
       .save("\curataed\dataset")

Also, if you wish to bring partitions to 1, why dont you use coalesce(1) as it will avoid a full shuffle.

From https://mungingdata.com/delta-lake/updating-partitions-with-replacewhere/:

'replaceWhere is particularly useful when you have to run a computationally expensive algorithm, but only on certain partitions'

Therefore, I personally think that using replacewhere to manually specify your overwrite will be more targeted and computationally efficient then to just rely on: spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

Databricks provides optimizations on delta tables make it a faster, and much more efficient option to parquet( hence a natural evolution) by bin packing and z-ordering:

From Link:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html

  • WHERE(binpacking)

'Optimize the subset of rows matching the given partition predicate. Only filters involving partition key attributes are supported.'

  • ZORDER BY

'Colocate column information in the same set of files. Co-locality is used by Delta Lake data-skipping algorithms to dramatically reduce the amount of data that needs to be read'.

  • Faster query execution with indexing, statistics, and auto-caching support

  • Data reliability with rich schema validation and transactional guarantees

  • Simplified data pipeline with flexible UPSERT support and unified Structured Streaming + batch processing on a single data source

You could also check out the complete documentation of the open source project: https://docs.delta.io/latest/index.html

.. I also want to say that I do not work for databricks/delta lake. I have just seen their improvements and functionality benefit me in my work.

UPDATE:

The gist of the question is "replacing data that exists and creating new folders for new data" and to do it in highly scalable and effective manner.

Using dynamic partition overwrite in parquet does the job however I feel like the natural evolution to that method is to use delta table merge operations which were basically created to 'integrate data from Spark DataFrames into the Delta Lake'. They provide you with extra functionality and optimizations in merging your data based on how would want that to happen and keep a log of all actions on a table so you can rollback versions if needed.

Delta lake python api(for merge): https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder

databricks optimization: https://kb.databricks.com/delta/delta-merge-into.html#discussion

Using a single merge operation you can specify the condition merge on, in this case it could be a combination of the year and week and id, and then if the records match(meaning they exist in your spark dataframe and delta table, week1 and week2), update them with the data in your spark dataframe and leave other records unchanged:

#you can also add additional condition if the records match, but not required
.whenMatchedUpdateAll(condition=None)

For some cases, if nothing matches then you might want to insert and create new rows and partitions, for that you can use:

.whenNotMatchedInsertAll(condition=None)

You can use .converttodelta operation https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.convertToDelta, to convert your parquet table to a delta table so that you can perform delta operations on it using the api.

'You can now convert a Parquet table in place to a Delta Lake table without rewriting any of the data. This is great for converting very large Parquet tables which would be costly to rewrite as a Delta table. Furthermore, this process is reversible'

Your merge case(replacing data where it exists and creating new records when it does not exist) could go like this:

(have not tested, refer to examples + api for syntax)

%python  
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`\curataed\dataset`")

deltaTable.alias("target").merge(dataset, "target.Year= dataset.Year  AND target.Week = dataset.Week") \
  .whenMatchedUpdateAll()\
  .whenNotMatchedInsertAll()\
  .execute()

If the delta table is partitioned correctly(year,week) and you used whenmatched clause correctly, these operations will be highly optimized and could take seconds in your case. It also provides you with consistency, atomicity and data integrity with option to rollback.

Some more functionality provided is that you can specify the set of columns to update if the match is made, (if you only need to update certain columns). You can also enable spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true"), so that delta uses minimal targeted partitions to carry out the merge(update,delete,create).

Overall, I think using this approach is a very new and innovative way of carrying out targeted updates as it gives you more control over it while keeping ops highly efficient. Using parquet with dynamic partitionoverwrite mode will also work fine however, delta lake features bring data quality to your data lake that is unmatched.

My recommendation: I would say for now, use dynamic partition overwrite mode for parquet files to do your updates, and you could experiment and try to use the delta merge on just one table with the databricks optimization of spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true") and .whenMatchedUpdateAll() and compare the performance of both(your files are small so I do not think it will be a big difference). The databricks partition pruning optimization for merges article came out in Feb so it is really new and possibly could be a gamechanger for the overhead delta merge operations incur( as under the hood they just create new files, but partition pruning could speed it up)

Merge examples in python,scala,sql: https://docs.databricks.com/delta/delta-update.html#merge-examples

https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html

like image 140
murtihash Avatar answered Oct 02 '22 13:10

murtihash