My dataset is partitioned in this way:
Year=yyyy
|---Month=mm
| |---Day=dd
| | |---<parquet-files>
What is the easiest and efficient way to create a dataframe in spark loaded with data between two dates?
Range Partitioning in Apache Spark Through this method, tuples those have keys within the same range will appear on the same machine. In range partitioner, keys are partitioned based on an ordering of keys. Also, depends on the set of sorted range of keys.
Spark automatically partitions RDDs and distributes the partitions across different nodes. A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster. Partitions are basic units of parallelism in Apache Spark. RDDs in Apache Spark are collection of partitions.
Date_format() function is a Date function that returns the date into a specified format. The Spark SQL functions package is imported into the environment to run date functions. Seq() function takes the date 2021-02-14 as Input. The current_date function takes the current date.
Spark/PySpark partitioning is a way to split the data into multiple partitions so that you can execute transformations on multiple partitions in parallel which allows completing the job faster. You can also write partitioned data into a file system (multiple sub-directories) for faster reads by downstream systems.
If you absolutely have to stick to this partitioning strategy, the answer depends on whether you are willing to bear partition discovery costs or not.
If you are willing to have Spark discover all partitions, which only needs to happen once (until you add new files), you can load the basepath and then filter using the partition columns.
If you do not want Spark to discover all the partitions, e.g., because you have millions of files, the only efficient general solution is to break the interval you want to query for into several sub-intervals you can easily query for using @r0bb23's approach and then union then together.
If you want the best of both cases above and you have a stable schema, you can register the partitions in the metastore by defining an external partitioned table. Don't do this if you expect your schema to evolve as metastore-managed tables manage schema evolution quite poorly at this time.
For example, to query between 2017-10-06
and 2017-11-03
you'd do:
// With full discovery
spark.read.parquet("hdfs:///basepath")
.where('Year === 2017 && (
('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
))
// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)
Writing generic code for this is certainly possible but I haven't encountered it. The better approach is to partition in the manner outlined in the comment I made to the question. If your table was partitioned using something like /basepath/ts=yyyymmddhhmm/*.parquet
then the answer is simply:
spark.read.parquet("hdfs:///basepath")
.where('ts >= 201710060000L && 'ts <= 201711030000L)
The reason why it's worth adding hours & minutes is that you can then write generic code that handles intervals regardless of whether you have the data partitioned by week, day, hour, or every 15 mins. In fact you can even manage data with different granularity in the same table, e.g., older data is aggregated at higher levels to reduce the total number of partitions that need to be discovered.
Edited to add multiple load paths to address comment.
You can use a regex style syntax.
val dataset = spark
.read
.format("parquet")
.option("filterPushdown", "true")
.option("basePath", "hdfs:///basepath/")
.load("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9],[1-3][0-9]}/*/",
"hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
How to use regex to include/exclude some input files in sc.textFile?
Note: you don't need the X=*
you can just do *
if you want all days, months, etc.
You should probably also do some reading about Predicate Pushdown (ie filterPushdown set to true above).
Finally, you will notice the basepath option above, the reason for that can be found here: Prevent DataFrame.partitionBy() from removing partitioned columns from schema
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With