I have some data partitioned this way:
/data/year=2016/month=9/version=0
/data/year=2016/month=10/version=0
/data/year=2016/month=10/version=1
/data/year=2016/month=10/version=2
/data/year=2016/month=10/version=3
/data/year=2016/month=11/version=0
/data/year=2016/month=11/version=1
When using this data, I'd like to load the last version only of each month.
A simple way to do this is to do load("/data/year=2016/month=11/version=3")
instead of doing load("/data")
.
The drawback of this solution is the loss of partitioning information such as year
and month
, which means it would not be possible to apply operations based on the year or the month anymore.
Is it possible to ask Spark to load the last version only of each month? How would you go about this?
Well, Spark supports predicate push-down, so if you provide a filter
following the load
, it will only read in the data fulfilling the criteria in the filter
. Like this:
spark.read.option("basePath", "/data").load("/data").filter('version === 3)
And you get to keep the partitioning information :)
Just an addition to previous answers for reference
I have a below ORC format table in hive which is partitioned on year,month & date column.
hive (default)> show partitions test_dev_db.partition_date_table;
OK
year=2019/month=08/day=07
year=2019/month=08/day=08
year=2019/month=08/day=09
If I set below properties, I can read the latest partition data in spark sql as shown below:
spark.sql("SET spark.sql.orc.enabled=true");
spark.sql("SET spark.sql.hive.convertMetastoreOrc=true")
spark.sql("SET spark.sql.orc.filterPushdown=true")
spark.sql("""select * from test_dev_db.partition_date_table where year ='2019' and month='08' and day='07' """).explain(True)
we can see PartitionCount: 1 in plan and it's obvious that it has filtered the latest partition.
== Physical Plan ==
*(1) FileScan orc test_dev_db.partition_date_table[emp_id#212,emp_name#213,emp_salary#214,emp_date#215,year#216,month#217,day#218] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxxx/dev/hadoop/database/test_dev..., **PartitionCount: 1**, PartitionFilters: [isnotnull(year#216), isnotnull(month#217), isnotnull(day#218), (year#216 = 2019), (month#217 = 0..., PushedFilters: [], ReadSchema: struct<emp_id:int,emp_name:string,emp_salary:int,emp_date:date>
whereas same will not work if I use below query:
even if we create dataframe using spark.read.format("orc").load(hdfs absolute path of table)
and create a temporary view and run spark sql on that. It will still scan all the partitions available for that table until and unless we use specific filter condition on top of that.
spark.sql("""select * from test_dev_db.partition_date_table where year ='2019' and month='08' and day in (select max(day) from test_dev_db.partition_date_table)""").explain(True)
It still has scanned all the three partitions, here PartitionCount: 3
== Physical Plan ==
*(2) BroadcastHashJoin [day#282], [max(day)#291], LeftSemi, BuildRight
:- *(2) FileScan orc test_dev_db.partition_date_table[emp_id#276,emp_name#277,emp_salary#278,emp_date#279,year#280,month#281,day#282] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 3, PartitionFilters: [isnotnull(year#280), isnotnull(month#281), (year#280 = 2019), (month#281 = 08)], PushedFilters: [], ReadSchema: struct<emp_id:int,emp_name:string,emp_salary:int,emp_date:date>
To filter out the data based on the max partition using spark sql, we can use the below approach. we can use below technique for partition pruning to limits the number of files and partitions that Spark reads when querying the Hive ORC table data.
rdd=spark.sql("""show partitions test_dev_db.partition_date_table""").rdd.flatMap(lambda x:x)
newrdd=rdd.map(lambda x : x.replace("/","")).map(lambda x : x.replace("year=","")).map(lambda x : x.replace("month=","-")).map(lambda x : x.replace("day=","-")).map(lambda x : x.split('-'))
max_year=newrdd.map(lambda x : (x[0])).max()
max_month=newrdd.map(lambda x : x[1]).max()
max_day=newrdd.map(lambda x : x[2]).max()
prepare your query to filter Hive partition table using these max values.
query = "select * from test_dev_db.partition_date_table where year ='{0}' and month='{1}' and day ='{2}'".format(max_year,max_month,max_day)
>>> spark.sql(query).show();
+------+--------+----------+----------+----+-----+---+
|emp_id|emp_name|emp_salary| emp_date|year|month|day|
+------+--------+----------+----------+----+-----+---+
| 3| Govind| 810000|2019-08-09|2019| 08| 09|
| 4| Vikash| 5500|2019-08-09|2019| 08| 09|
+------+--------+----------+----------+----+-----+---+
spark.sql(query).explain(True)
If you see the plan of this query, you can see that it has scanned only one partition of given Hive table. here PartitionCount is 1
== Optimized Logical Plan ==
Filter (((((isnotnull(day#397) && isnotnull(month#396)) && isnotnull(year#395)) && (year#395 = 2019)) && (month#396 = 08)) && (day#397 = 09))
+- Relation[emp_id#391,emp_name#392,emp_salary#393,emp_date#394,year#395,month#396,day#397] orc
== Physical Plan ==
*(1) FileScan orc test_dev_db.partition_date_table[emp_id#391,emp_name#392,emp_salary#393,emp_date#394,year#395,month#396,day#397] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 1, PartitionFilters: [isnotnull(day#397), isnotnull(month#396), isnotnull(year#395), (year#395 = 2019), (month#396 = 0..., PushedFilters: [], ReadSchema: struct<emp_id:int,emp_name:string,emp_salary:int,emp_date:date>
I think you have to use Spark's Window Function and then find and filter out the latest version.
import org.apache.spark.sql.functions.{col, first}
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("year","month").orderBy(col("version").desc)
spark.read.load("/data")
.withColumn("maxVersion", first("version").over(windowSpec))
.select("*")
.filter(col("maxVersion") === col("version"))
.drop("maxVersion")
Let me know if this works for you.
Here's a Scala general function:
/**
* Given a DataFrame, use keys (e.g. last modified time), to show the most up to date record
*
* @param dF DataFrame to be parsed
* @param groupByKeys These are the columns you would like to groupBy and expect to be duplicated,
* hence why you're trying to obtain records according to a latest value of keys.
* @param keys The sequence of keys used to rank the records in the table
* @return DataFrame with records that have rank 1, this means the most up to date version of those records
*/
def getLastUpdatedRecords(dF: DataFrame, groupByKeys: Seq[String], keys: Seq[String]): DataFrame = {
val part = Window.partitionBy(groupByKeys.head, groupByKeys.tail: _*).orderBy(array(keys.head, keys.tail: _*).desc)
val rowDF = dF.withColumn("rn", row_number().over(part))
val res = rowDF.filter(col("rn")===1).drop("rn")
res
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With