Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Drop partitions from Spark

I'm using Java-Spark (Spark 2.2.0).

I'm trying to drop Hive partitions as follow:

spark.sql(""ALTER TABLE backup DROP PARTITION (date < '20180910')"

And got the following exception:

org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '<' expecting {')', ','}(line 1, pos 42)

I know this is open issue ALTER TABLE DROP PARTITION should support comparators that should be fixed in my version but I'm still get exception.

What is my alternative to drop the partitions from Spark? There is another implementation to do that?

Thanks.

like image 623
Ya Ko Avatar asked Sep 27 '18 07:09

Ya Ko


2 Answers

It seems like there no way to do this for the time being. As shown in SPARK-14922, the target version for this fix is 3.0.0 and it is still in progress.

As such there are two possible workarounds in my view.

Let's set up the problem using Spark 2.4.3:

// We create the table
spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")

// Enable dynamic partitioning 
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")

// Insert some dummy records
(1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))

// Verify inserts
spark.table("potato").count // 9 records

Now... Trying to drop a single partition from inside spark works!

spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour='2020-06-07T01')""")
spark.table("potato").count // 8 records

Trying to drop multiple partitions doesn't work.

spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")""")

org.apache.spark.sql.catalyst.parser.ParseException:
Found duplicate keys 'hour'.(line 1, pos 34)

== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")
----------------------------------^^^

Using a comparison operator to drop a range of partitions also doesn't work.

spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")""")

org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<=' expecting {')', ','}(line 1, pos 49)

== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")
-------------------------------------------------^^^

This supposedly happens because the partition column is a string and we are using comparison operators.

The solution I found is:

  1. Get the list of partitions and conditionally filter them.
  2. Either drop the individual partitions one by one, or pass them as a sequence of [Map[String,String] (TablePartitionSpec) to the catalog's dropPartitions function.

Step 1:

// Get External Catalog
val catalog = spark.sharedState.externalCatalog

// Get the spec from the list of partitions 
val partitions = catalog.listPartitions("default", "potato").map(_.spec)

// Filter according to the condition you attempted.
val filteredPartitions = partitions.flatten.filter(_._2 <= "2020-06-07T03")
                                           .map(t => Map(t._1 -> t._2))

Step 2:

We pass each argument tuple to an individual ALTER TABLE DROP PARTITION statement.

filteredPartitions.flatten.foreach(t => 
     spark.sql(s"""ALTER TABLE potato DROP IF EXISTS PARTITION (${t._1}="${t._2}")"""))
spark.table("potato").count // 6 records

Or pass them to the Catalog's dropPartition function.

// If you purge data, it gets deleted immediately and isn't moved to trash.
// This takes precedence over retainData, so even if you retainData but purge,
// your data is gone.
catalog.dropPartitions("default", "potato", filteredPartitions,
                       ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 6 records

I hope that was helpful. Let me know if you have a better solution for Spark 2.x.

like image 121
kfkhalili Avatar answered Nov 18 '22 02:11

kfkhalili


You can do the same with spark programming. Also its not fixed in Spark 2 , 2.1 & 2.2 for ref https://issues.apache.org/jira/browse/SPARK-14922

    Steps 

        1 . Create hive context 
        2 . Get the table for getTable method from the hive context and you need to pass dbName, tableName and a boolean value if any error
        3 . From table Object hive.getPartitions(table) you can get the partitions from hive context (you need to decide which partitions you are going delete )
            4 . You can remove partitions using dropPartition with partition values , table name and db info (hive.dropPartition) 

    hiveContext.getPartitions(table)
    hiveContext.dropPartition(dbName, tableName, partition.getValues(), true)


You need to validate the partition name and check whether it needs to be deleted or not (you need to write custom method ).

       Or you can get the partition list sql using show partitions and from there also you can use drop partition to remove it.

This may give you some pointers .
like image 30
Abdul Avatar answered Nov 18 '22 02:11

Abdul