What's the simplest/fastest way to get the partition keys? Ideally into a python list.
Ultimately want to use is this to not process data from partitions that have already been processed. So in the example below only want to process data from day 3. But there may be more than 1 day to process.
Lets say the directory structure is
date_str=2010-01-01
date_str=2010-01-02
date_str=2010-01-03
Reading the dataframe with partition information
ddf2 = spark.read.csv("data/bydate")
Solutions I have tried below. They look excessively wordy and not sure if they are fast. The query shouldn't read any data since it just needs to check directory keys.
from pyspark.sql import functions as F
ddf2.select(F.collect_set('date_str').alias('date_str')).first()['date_str']
# seems to work well albeit wordy
ddf2.select("date_str").distinct().collect()
# [Row(date_str=datetime.date(2010, 1, 10)), Row(date_str=datetime.date(2010, 1, 7)),
# not a python list and slow?
ddf2.createOrReplaceTempView("intent")
spark.sql("""show partitions intent""").toPandas()
# error
ddf2.rdd.getNumPartitions()
# not returning the keys, just the number, which isn't even all the keys
Convert distinct values in a Dataframe in Pyspark to a list
PySpark + Cassandra: Getting distinct values of partition key
pyspark - getting Latest partition from Hive partitioned column logic
Show partitions on a pyspark RDD
So indeed the set and distinct solutions scan all the data and will be horribly slow on large data. The details are documented here
https://issues.apache.org/jira/browse/SPARK-34194
https://issues.apache.org/jira/browse/SPARK-12890
Afaik the fastest way to get partition keys is this solution pyspark - getting Latest partition from Hive partitioned column logic
Adopted to get into a list (one partition solution)
spark.catalog.createTable(
'table',
path='data/bydate',
source='csv',
)
spark.catalog.recoverPartitions('table')
df_partitions = spark.sql('show partitions table').toPandas()
partitions = df_partitions['partition'].str.replace('dt=','').tolist()
Since there isn't a neat pyspark solution, can use the below on the directories.
Databricks: ls the directory
import re
[re.search('(\d+)', o.name).group() for o in dbutils.fs.ls(cfg_fpath_intent)]
S3: use datawrangler list_directories instead of ls
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With