Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark - getting Latest partition from Hive partitioned column logic

I am new to pySpark. I am trying get the latest partition (date partition) of a hive table using PySpark-dataframes and done like below. But I am sure there is a better way to do it using dataframe functions (not by writing SQL). Could you please share inputs on better ways.

This solution is scanning through entire data on Hive table to get it.

df_1 = sqlContext.table("dbname.tablename");

df_1_dates = df_1.select('partitioned_date_column').distinct().orderBy(df_1['partitioned_date_column'].desc())

lat_date_dict=df_1_dates.first().asDict()

lat_dt=lat_date_dict['partitioned_date_column']
like image 649
vinu.m.19 Avatar asked Mar 07 '19 21:03

vinu.m.19


People also ask

Is there any relationship between hive and Spark partitioning?

Note right away that spark partitions ≠ hive partitions. They are both chunks of data, but Spark splits data in order to process it in parallel in memory. Hive partition is in the storage, in the disk, in persistence.

How do I get all partitions in hive?

Getting ready This command lists all the partitions for a table. The general syntax for showing partitions is as follows: SHOW PARTITIONS [db_name.] table_name [PARTITION(partition_spec)];

How Spark decides number of partitions?

The number of partitions in spark should be decided thoughtfully based on the cluster configuration and requirements of the application. Increasing the number of partitions will make each partition have less data or no data at all.

How do I update a partitioned column in hive?

Update Hive Partition You can use the Hive ALTER TABLE command to change the HDFS directory location of a specific partition. The below example update the state=NC partition location from the default Hive store to a custom location /data/state=NC.


2 Answers

I agree with @philantrovert what has mentioned in the comment. You can use below approach for partition pruning to filter to limit the number of partitions scanned for your hive table.

>>> spark.sql("""show partitions test_dev_db.newpartitiontable""").show();
+--------------------+
|           partition|
+--------------------+
|tran_date=2009-01-01|
|tran_date=2009-02-01|
|tran_date=2009-03-01|
|tran_date=2009-04-01|
|tran_date=2009-05-01|
|tran_date=2009-06-01|
|tran_date=2009-07-01|
|tran_date=2009-08-01|
|tran_date=2009-09-01|
|tran_date=2009-10-01|
|tran_date=2009-11-01|
|tran_date=2009-12-01|
+--------------------+

>>> max_date=spark.sql("""show partitions test_dev_db.newpartitiontable""").rdd.flatMap(lambda x:x).map(lambda x : x.replace("tran_date=","")).max()
>>> print max_date
2009-12-01
>>> query = "select city,state,country from test_dev_db.newpartitiontable where tran_date ='{}'".format(max_date)

>>> spark.sql(query).show();
+--------------------+----------------+--------------+
|                city|           state|       country|
+--------------------+----------------+--------------+
|         Southampton|         England|United Kingdom|
|W Lebanon        ...|              NH| United States|
|               Comox|British Columbia|        Canada|
|           Gasperich|      Luxembourg|    Luxembourg|
+--------------------+----------------+--------------+

>>> spark.sql(query).explain(True)
== Parsed Logical Plan ==
'Project ['city, 'state, 'country]
+- 'Filter ('tran_date = 2009-12-01)
   +- 'UnresolvedRelation `test_dev_db`.`newpartitiontable`

== Analyzed Logical Plan ==
city: string, state: string, country: string
Project [city#9, state#10, country#11]
+- Filter (tran_date#12 = 2009-12-01)
   +- SubqueryAlias newpartitiontable
      +- Relation[city#9,state#10,country#11,tran_date#12] orc

== Optimized Logical Plan ==
Project [city#9, state#10, country#11]
+- Filter (isnotnull(tran_date#12) && (tran_date#12 = 2009-12-01))
   +- Relation[city#9,state#10,country#11,tran_date#12] orc

== Physical Plan ==
*(1) Project [city#9, state#10, country#11]
+- *(1) FileScan orc test_dev_db.newpartitiontable[city#9,state#10,country#11,tran_date#12] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 1, PartitionFilters: [isnotnull(tran_date#12), (tran_date#12 = 2009-12-01)], PushedFilters: [], ReadSchema: struct<city:string,state:string,country:string>

you can see in above plan that PartitionCount: 1 it has scanned only one partition from 12 available partitions.

like image 166
vikrant rana Avatar answered Jan 02 '23 20:01

vikrant rana


Building on Vikrant's answer, here is a more general way of extracting partition column values directly from the table metadata, which avoids Spark scanning through all the files in the table.

First, if your data isn't already registered in a catalog, you'll want to do that so Spark can see the partition details. Here, I'm registering a new table named data.

spark.catalog.createTable(
    'data',
    path='/path/to/the/data',
    source='parquet',
)
spark.catalog.recoverPartitions('data')
partitions = spark.sql('show partitions data')

To show a self-contained answer, however, I'll manually create the partitions DataFrame so you can see what it would look like, along with the solution for extracting a specific column value from it.

from pyspark.sql.functions import (
    col,
    regexp_extract,
)

partitions = (
    spark.createDataFrame(
        [
            ('/country=usa/region=ri/',),
            ('/country=usa/region=ma/',),
            ('/country=russia/region=siberia/',),
        ],
        schema=['partition'],
    )
)
partition_name = 'country'

(
    partitions
    .select(
        'partition',
        regexp_extract(
            col('partition'),
            pattern=r'(\/|^){}=(\S+?)(\/|$)'.format(partition_name),
            idx=2,
        ).alias(partition_name),
    )
    .show(truncate=False)
)

The output of this query is:

+-------------------------------+-------+
|partition                      |country|
+-------------------------------+-------+
|/country=usa/region=ri/        |usa    |
|/country=usa/region=ma/        |usa    |
|/country=russia/region=siberia/|russia |
+-------------------------------+-------+

The solution in Scala will look very similar to this, except the call to regexp_extract() will look slightly different:

    .select(
        regexp_extract(
            col("partition"),
            exp=s"(\\/|^)${partitionName}=(\\S+?)(\\/|$$)",
            groupIdx=2
        ).alias(partitionName).as[String]
    )

Again, the benefit of querying partition values in this way is that Spark will not scan all the files in the table to get you the answer. If you have a table with tens or hundreds of thousands of files in it, your time savings will be significant.

like image 23
Nick Chammas Avatar answered Jan 02 '23 21:01

Nick Chammas