Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark dataframe get partitions keys

Tags:

pyspark

What's the simplest/fastest way to get the partition keys? Ideally into a python list.

Ultimately want to use is this to not process data from partitions that have already been processed. So in the example below only want to process data from day 3. But there may be more than 1 day to process.

Lets say the directory structure is

date_str=2010-01-01
date_str=2010-01-02
date_str=2010-01-03

Reading the dataframe with partition information

ddf2 = spark.read.csv("data/bydate")

Solutions I have tried below. They look excessively wordy and not sure if they are fast. The query shouldn't read any data since it just needs to check directory keys.

from pyspark.sql import functions as F

ddf2.select(F.collect_set('date_str').alias('date_str')).first()['date_str']
# seems to work well albeit wordy

ddf2.select("date_str").distinct().collect()
# [Row(date_str=datetime.date(2010, 1, 10)), Row(date_str=datetime.date(2010, 1, 7)),
# not a python list and slow?

ddf2.createOrReplaceTempView("intent")
spark.sql("""show partitions intent""").toPandas()
# error

ddf2.rdd.getNumPartitions()
# not returning the keys, just the number, which isn't even all the keys

Convert distinct values in a Dataframe in Pyspark to a list

PySpark + Cassandra: Getting distinct values of partition key

pyspark - getting Latest partition from Hive partitioned column logic

Show partitions on a pyspark RDD

like image 391
citynorman Avatar asked Oct 27 '25 14:10

citynorman


1 Answers

So indeed the set and distinct solutions scan all the data and will be horribly slow on large data. The details are documented here

https://issues.apache.org/jira/browse/SPARK-34194

https://issues.apache.org/jira/browse/SPARK-12890

Afaik the fastest way to get partition keys is this solution pyspark - getting Latest partition from Hive partitioned column logic

Adopted to get into a list (one partition solution)

spark.catalog.createTable(
    'table',
    path='data/bydate',
    source='csv',
)
spark.catalog.recoverPartitions('table')

df_partitions = spark.sql('show partitions table').toPandas()
partitions = df_partitions['partition'].str.replace('dt=','').tolist()

Since there isn't a neat pyspark solution, can use the below on the directories.

Databricks: ls the directory

import re
[re.search('(\d+)', o.name).group() for o in dbutils.fs.ls(cfg_fpath_intent)]

S3: use datawrangler list_directories instead of ls

like image 76
citynorman Avatar answered Oct 30 '25 14:10

citynorman



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!