Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

overwrite hive partitions using spark

I am working with AWS and I have workflows that use Spark and Hive. My data is partitioned by the date, so everyday I have a new partition in my S3 storage. My problem is when one day the load data fails and I have to re-execute that partition. The code that writes is next:

df                            // My data in a Dataframe
  .write
  .format(getFormat(target))  // csv by default, but could be parquet, ORC...
  .mode(getSaveMode("overwrite"))  // Append by default, but in future it should be Overwrite
  .partitionBy(partitionName) // Column of the partition, the date
  .options(target.options)    // header, separator...
  .option("path", target.path) // the path where it will be storage
  .saveAsTable(target.tableName)  // the table name

What happens in my flow? If I use the SaveMode.Overwrite, the complete table will be delete and I will have only the partition saved. If I use the SaveMode.Append I could have duplicate data.

Making a search, I found that Hive support this kind of overwrite, only partition, but using the hql sentences, I don´t have it.

We need the solution on Hive, so we can´t use this alternative option (direct to csv).

I had found this Jira ticket that suppose to solve the problem that I´m having, but trying that with the last version of Spark (2.3.0), the situation was the same. It delete the whole table and save the partition instead of overwrite the partition that my data has.

Trying to make clearer this, this is an example:

Partitioned by A

Data:

| A | B | C | 
|---|---|---| 
| b | 1 | 2 | 
| c | 1 | 2 |

Table:

| A | B | C | 
|---|---|---| 
| a | 1 | 2 | 
| b | 5 | 2 | 

What I want is: In Table, the partition a stay in table, partition b overwrite with the Data, and add the partition c. Is there any solution using Spark that I can do this?

My last option to do this is first deleting the partition that is going to be saved and then use the SaveMode.Append, but I would try this in case no other solution.

like image 207
H. M. Avatar asked Apr 23 '18 09:04

H. M.


People also ask

How does overwrite work Spark?

Overwrite is defined as a Spark savemode in which an already existing file is replaced by new content. In simple words, when saving a DataFrame to the data source, if the data/ table already exists, then the existing data/table is expected to be overwritten by the contents of the Dataframe.

What is dynamic overwrite?

A write that dynamically overwrites partitions removes all existing data in each logical partition for which the write will commit new data. Any existing logical partition for which the write does not contain data will remain unchanged.

How do I change the partition number of Spark?

How to increase the number of partitions. If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions.


3 Answers

If you are on Spark 2.3.0, try setting spark.sql.sources.partitionOverwriteMode setting to dynamic, the dataset needs to be partitioned, and the write mode overwrite.

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
like image 151
wandermonk Avatar answered Oct 20 '22 11:10

wandermonk


So, if you are using Spark version < 2.3 and want to write into partitions dynamically without deleting the others, you can implement the below solution.

The idea is to register the dataset as a table and then use spark.sql() to run the INSERT query.

// Create SparkSession with Hive dynamic partitioning enabled
val spark: SparkSession =
    SparkSession
        .builder()
        .appName("StatsAnalyzer")
        .enableHiveSupport()
        .config("hive.exec.dynamic.partition", "true")
        .config("hive.exec.dynamic.partition.mode", "nonstrict")
        .getOrCreate()
// Register the dataframe as a Hive table
impressionsDF.createOrReplaceTempView("impressions_dataframe")
// Create the output Hive table
spark.sql(
    s"""
      |CREATE EXTERNAL TABLE stats (
      |   ad            STRING,
      |   impressions   INT,
      |   clicks        INT
      |) PARTITIONED BY (country STRING, year INT, month INT, day INT)
      |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
    """.stripMargin
)
// Write the data into disk as Hive partitions
spark.sql(
    s"""
      |INSERT OVERWRITE TABLE stats 
      |PARTITION(country = 'US', year = 2017, month = 3, day)
      |SELECT ad, SUM(impressions), SUM(clicks), day
      |FROM impressions_dataframe
      |GROUP BY ad
    """.stripMargin
)
like image 44
ravi malhotra Avatar answered Oct 20 '22 11:10

ravi malhotra


I would suggest to run sql using sparksession. you can run " insert overwrite partition query" by selecting the columns from existing dataset. this solution will surely overwrites partition only.

like image 1
Sourav Gulati Avatar answered Oct 20 '22 11:10

Sourav Gulati