Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Joining Two Datasets with Predicate Pushdown

I have a Dataset that i created from a RDD and try to join it with another Dataset which is created from my Phoenix Table:

val dfToJoin = sparkSession.createDataset(rddToJoin)
val tableDf = sparkSession
  .read
  .option("table", "table")
  .option("zkURL", "localhost")
  .format("org.apache.phoenix.spark")
  .load()
val joinedDf = dfToJoin.join(tableDf, "columnToJoinOn")

When i execute it, it seems that the whole database table is loaded to do the join.

Is there a way to do such a join so that the filtering is done on the database instead of in spark?

Also: dfToJoin is smaller than the table, i do not know if this is important.

Edit: Basically i want to join my Phoenix table with an Dataset created through spark, without fetching the whole table into the executor.

Edit2: Here is the physical plan:

*Project [FEATURE#21, SEQUENCE_IDENTIFIER#22, TAX_NUMBER#23, 
         WINDOW_NUMBER#24, uniqueIdentifier#5, readLength#6]
 +- *SortMergeJoin [FEATURE#21], [feature#4], Inner
     :- *Sort [FEATURE#21 ASC NULLS FIRST], false, 0
     :  +- Exchange hashpartitioning(FEATURE#21, 200)
     :     +- *Filter isnotnull(FEATURE#21)
     :        +- *Scan PhoenixRelation(FEATURES,localhost,false) 

    [FEATURE#21,SEQUENCE_IDENTIFIER#22,TAX_NUMBER#23,WINDOW_NUMBER#24] 
    PushedFilters: [IsNotNull(FEATURE)], ReadSchema: 

    struct<FEATURE:int,SEQUENCE_IDENTIFIER:string,TAX_NUMBER:int,
    WINDOW_NUMBER:int>
   +- *Sort [feature#4 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(feature#4, 200)
     +- *Filter isnotnull(feature#4)
        +- *SerializeFromObject [assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).feature AS feature#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).uniqueIdentifier, true) AS uniqueIdentifier#5, assertnotnull(input[0, utils.CaseClasses$QueryFeature, true], top level Product input object).readLength AS readLength#6]
           +- Scan ExternalRDDScan[obj#3]

As you can see the equals-filter is not contained in the pushed-filters list, so it is obvious that no predicate pushdown is happening.

like image 484
Eti49 Avatar asked Mar 08 '23 00:03

Eti49


1 Answers

Spark will fetch the Phoenix table records to appropriate executors(not the entire table to one executor)

As the is no direct filter on Phoenix table df, we see only *Filter isnotnull(FEATURE#21) in physical plan.


As you are mentioning Phoenix table data is less when you apply filter on it. You push the filter to phoenix table on feature column by finding feature_ids in other dataset.

//This spread across workers  - fully distributed
val dfToJoin = sparkSession.createDataset(rddToJoin)

//This sits in driver - not distributed
val list_of_feature_ids = dfToJoin.dropDuplicates("feature")
  .select("feature")
  .map(r => r.getString(0))
  .collect
  .toList

//This spread across workers  - fully distributed
val tableDf = sparkSession
  .read
  .option("table", "table")
  .option("zkURL", "localhost")
  .format("org.apache.phoenix.spark")
  .load()
  .filter($"FEATURE".isin(list_of_feature_ids:_*)) //added filter

//This spread across workers  - fully distributed
val joinedDf = dfToJoin.join(tableDf, "columnToJoinOn")

joinedDf.explain()
like image 61
mrsrinivas Avatar answered Mar 17 '23 04:03

mrsrinivas