Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SubQuery scan whole partition

Tags:

I have a hive table which is partitioned by 'date' field i want to write a query to get the data from latest(max) partition.

spark.sql("select field from table  where date_of = '2019-06-23'").explain(True)
vs 
spark.sql("select filed from table where date_of = (select max(date_of) from table)").explain(True)

Below are the Physical plan of the two query

*(1) Project [qbo_company_id#120L]
        +- *(1) FileScan parquet 
    table[qbo_company_id#120L,date_of#157] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1, PartitionFilters: [isnotnull(date_of#157), (cast(date_of#157 as string) = 2019-06-23)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>

*(1) Project [qbo_company_id#1L]
+- *(1) Filter (date_of#38 = Subquery subquery0)
   :  +- Subquery subquery0
   :     +- *(2) HashAggregate(keys=[], functions=[max(date_of#76)], output=[max(date_of)#78])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_max(date_of#76)], output=[max#119])
   :              +- LocalTableScan [date_of#76]
   +- *(1) FileScan parquet 
table[qbo_company_id#1L,date_of#38] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[s3location..., PartitionCount: 1836, PartitionFilters: [isnotnull(date_of#38)], PushedFilters: [], ReadSchema: struct<qbo_company_id:bigint>

Why is the subquery scanning the whole partition instead of choosing the latest one? With the help of metadata about partitions, why can it not scan only the required partition?

like image 217
SelvamR Avatar asked Jul 11 '19 17:07

SelvamR


1 Answers

If I were you... I'd prefer different approach rather than sql query and full table scan.

spark.sql(s"show partitions $tablename")

Then, I will convert that in to Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime] which has joda date columns

/**
    * listMyHivePartitions - lists hive partitions as sequence of map
    * @param tableName String
    * @param spark SparkSession
    * @return Seq[Map[String, DateTime]]
    */
  def listMyHivePartitions(tableName :String,spark:SparkSession) : Seq[Map[String, DateTime]]  = {
    println(s"Listing the keys from ${tableName}")
    val partitions: Seq[String] = spark.sql(s"show partitions ${tableName}").collect().map(row => {
      println(s" Identified Key: ${row.toString()}")
      row.getString(0)
    }).toSeq
    println(s"Fetched ${partitions.size}  partitons from ${tableName}")
    partitions.map(key => key.split("/").toSeq.map(keyVal => {
      val keyValSplit = keyVal.split("=")
      (keyValSplit(0).toLowerCase().trim, new DateTime(keyValSplit(1).trim))
    }).toMap)
  }

and will apply

getRecentPartitionDate like below

/**
    * getRecentPartitionDate.
    *
    * @param column   String
    * @param seqOfMap { @see Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]}
    **/
  def getRecentPartitionDate(column: String, seqOfMap: Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]]): Option[Map[String, DateTime]] = {
    logger.info(" >>>>> column " + column)
    val mapWithMostRecentBusinessDate = seqOfMap.sortWith(
      (a, b) => {
        logger.debug(a(column).toString() + " col2" + b(column).toString())
        a(column).isAfter(b(column))
      }
    )

    logger.debug(s" mapWithMostRecentBusinessDate: $mapWithMostRecentBusinessDate , \n Head = ${mapWithMostRecentBusinessDate.headOption} ")

    mapWithMostRecentBusinessDate.headOption
  }

Advantage is no sqls no full table scans...

The above can be also applied when you query from hivemetastore which is database at backend will file show paritions table on that, result of the query is java.sql.ResultSet

 /**
        * showParts.
        *
        * @param table
        * @param config
        * @param stmt
        */
      def showParts(table: String, config: Config, stmt: Statement): Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]] = {
        val showPartitionsCmd = " show partitions " + table;
        logger.info("showPartitionsCmd " + showPartitionsCmd)
        try {
          val resultSet = stmt.executeQuery(showPartitionsCmd)

          // checkData(resultSet)
          val result = resultToSeq(resultSet);
          logger.info(s"partitions of $table ->" + showPartitionsCmd + table);
          logger.debug("result " + result)

          result
        }
        catch {
          case e: Exception => logger.error(s"Exception occurred while show partitions table $table..", e)
            null
        }
      }

      /** *
        * resultToSeq.
        *
        * @param queryResult
        */
      def resultToSeq(queryResult: ResultSet) = {
        val md = queryResult.getMetaData

        val colNames = for (i <- 1 to md.getColumnCount) yield md.getColumnName(i)
        var rows = Seq[scala.collection.immutable.Map[String, org.joda.time.DateTime]]()
        while (queryResult.next()) {
          var row = scala.collection.immutable.Map.empty[String, DateTime]
          for (n <- colNames) {
            val str = queryResult.getString(n).split("=")

            //str.foreach(logger.info)
            import org.joda.time.format.DateTimeFormat
            val format = DateTimeFormat.forPattern("yyyy-MM-dd")
            row += str(0) -> DateTime.parse(str(1)) //.toString(DateTimeFormat.shortDate())
            logger.debug(row.toString())
          }
          rows = rows :+ row
        }

        rows
      }

after getting seq of map I will apply def in top i.e. getRecentPartitionDate

like image 70
Ram Ghadiyaram Avatar answered Sep 30 '22 21:09

Ram Ghadiyaram