Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL queries on partitioned data using Date Ranges

My dataset is partitioned in this way:

Year=yyyy
 |---Month=mm
 |   |---Day=dd
 |   |   |---<parquet-files>

What is the easiest and efficient way to create a dataframe in spark loaded with data between two dates?

like image 792
r4ravi2008 Avatar asked Nov 08 '17 22:11

r4ravi2008


People also ask

How do I use range partition in Spark?

Range Partitioning in Apache Spark Through this method, tuples those have keys within the same range will appear on the same machine. In range partitioner, keys are partitioned based on an ordering of keys. Also, depends on the set of sorted range of keys.

How is data partitioned in Spark?

Spark automatically partitions RDDs and distributes the partitions across different nodes. A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster. Partitions are basic units of parallelism in Apache Spark. RDDs in Apache Spark are collection of partitions.

Is Date function in Spark SQL?

Date_format() function is a Date function that returns the date into a specified format. The Spark SQL functions package is imported into the environment to run date functions. Seq() function takes the date 2021-02-14 as Input. The current_date function takes the current date.

What is a benefit for using the partition by function in Spark SQL?

Spark/PySpark partitioning is a way to split the data into multiple partitions so that you can execute transformations on multiple partitions in parallel which allows completing the job faster. You can also write partitioned data into a file system (multiple sub-directories) for faster reads by downstream systems.


2 Answers

If you absolutely have to stick to this partitioning strategy, the answer depends on whether you are willing to bear partition discovery costs or not.

If you are willing to have Spark discover all partitions, which only needs to happen once (until you add new files), you can load the basepath and then filter using the partition columns.

If you do not want Spark to discover all the partitions, e.g., because you have millions of files, the only efficient general solution is to break the interval you want to query for into several sub-intervals you can easily query for using @r0bb23's approach and then union then together.

If you want the best of both cases above and you have a stable schema, you can register the partitions in the metastore by defining an external partitioned table. Don't do this if you expect your schema to evolve as metastore-managed tables manage schema evolution quite poorly at this time.

For example, to query between 2017-10-06 and 2017-11-03 you'd do:

// With full discovery
spark.read.parquet("hdfs:///basepath")
  .where('Year === 2017 && (
    ('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
  ))

// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)

Writing generic code for this is certainly possible but I haven't encountered it. The better approach is to partition in the manner outlined in the comment I made to the question. If your table was partitioned using something like /basepath/ts=yyyymmddhhmm/*.parquet then the answer is simply:

spark.read.parquet("hdfs:///basepath")
  .where('ts >= 201710060000L && 'ts <= 201711030000L)

The reason why it's worth adding hours & minutes is that you can then write generic code that handles intervals regardless of whether you have the data partitioned by week, day, hour, or every 15 mins. In fact you can even manage data with different granularity in the same table, e.g., older data is aggregated at higher levels to reduce the total number of partitions that need to be discovered.

like image 163
Sim Avatar answered Oct 03 '22 16:10

Sim


Edited to add multiple load paths to address comment.

You can use a regex style syntax.

val dataset = spark
  .read
  .format("parquet")
  .option("filterPushdown", "true")
  .option("basePath", "hdfs:///basepath/")
  .load("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9],[1-3][0-9]}/*/",
    "hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")

How to use regex to include/exclude some input files in sc.textFile?

Note: you don't need the X=* you can just do * if you want all days, months, etc.

You should probably also do some reading about Predicate Pushdown (ie filterPushdown set to true above).

Finally, you will notice the basepath option above, the reason for that can be found here: Prevent DataFrame.partitionBy() from removing partitioned columns from schema

like image 22
Robert Beatty Avatar answered Oct 01 '22 16:10

Robert Beatty