Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to change the attributes order in Apache SparkSQL `Project` operator?

This is a Catalyst specific problem

See below my queryExecution.optimizedPlan before apply my Rule.

01 Project [x#9, p#10, q#11, if (isnull(q#11)) null else UDF(q#11) AS udfB_10#28, if (isnull(p#10)) null else UDF(p#10) AS udfA_99#93]
02 +- InMemoryRelation [x#9, p#10, q#11], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
03    :  +- *SerializeFromObject [assertnotnull(input[0, eic.R0, true], top level non-flat input object).x AS x#9, unwrapoption(IntegerType, assertnotnull(input[0, eic.R0, true], top level non-flat input object).p) AS p#10, unwrapoption(IntegerType, assertnotnull(input[0, eic.R0, true], top level non-flat input object).q) AS q#11]
04    :     +- *MapElements <function1>, obj#8: eic.R0
05    :        +- *DeserializeToObject newInstance(class java.lang.Long), obj#7: java.lang.Long
05    :           +- *Range (0, 3, step=1, splits=Some(2))

In line 01 I need swap the position of udfA and udfB this way:

01 Project [x#9, p#10, q#11, if (isnull(p#10)) null else UDF(p#10) AS udfA_99#93, if (isnull(q#11)) null else UDF(q#11) AS udfB_10#28]

when I try to change the order of the attributes in a Projection operation in SparkSQL via Catalyst optimization the result of the query is modified to an invalid value. Maybe I'm not doing everything is needed. I'm just changing the order of NamedExpression objects in fields parameter:

object ReorderColumnsOnProjectOptimizationRule extends Rule[LogicalPlan] {

  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {

    case Project(fields: Seq[NamedExpression], child) => 
      if (checkCondition(fields)) Project(newFieldsObject(fields), child) else Project(fields, child)

    case _ => plan

  }

  private def newFieldsObject(fields: Seq[NamedExpression]): Seq[NamedExpression] = {
    // compare UDFs computation cost and return the new NamedExpression list
    . . .
  }

  private def checkCondition(fields: Seq[NamedExpression]): Boolean = {
    // compare UDFs computation cost and return Boolean for decision off change order on field list.
    . . . 
  }
  . . .
}

Note: I'm adding my Rule on extraOptimizations SparkSQL object:

spark.experimental.extraOptimizations = Seq(ReorderColumnsOnProjectOptimizationRule)

Any suggestions will be of great help.

EDIT 1

By the way, I created a notebook on Databricks for testing purposes. See this link for more detail

Commenting on line 60 the optimization is invoked and an error occurs.

. . .
58     // Do UDF with less cost before, so I need change the fields order
59     myPriorityList.size == 2 && myPriorityList(0) > myPriorityList(1)
60     false
61   }

What did I miss ?

EDIT 2

Consider the following piece of code from compiler optimisation, which is almost analogous :

if ( really_slow_test(with,plenty,of,parameters)
     && slower_test(with,some,parameters)
     && fast_test // with no parameters
   )
 {
  ...then code...
 }

This code first evaluates an expensive function then, on success, proceeds to evaluate the remainder of the expression. But even if the first test fails and the evaluation is short-cut, there’s a significant performance penalty because the fat really_slow_test(...) is always evaluated. While retaining program correctness, one can rearrange the expression as follows:

if ( fast_test
     && slower_test(with,some,parameters)
     && (really_slow_test(with,plenty,of,parameters))
 {
  ...then code...
 }

My goal is to run the fastest UDFs first

like image 741
João Paraná Avatar asked Feb 04 '18 19:02

João Paraná


2 Answers

As stefanobaghino said the schema of the analyzer is cached after the analysis and the optimizer shouldn't change it.

If you use Spark 2.2 you can take advantage of SPARK-18127 and apply the rule in Analyzer.

If you run this dummy app

package panos.bletsos

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.SparkSessionExtensions


case class ReorderColumnsOnProjectOptimizationRule(spark: SparkSession) extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transformDown  {
    case p: Project => {
      val fields = p.projectList
      if (checkConditions(fields, p.child)) {
        val modifiedFieldsObject = optimizePlan(fields, p.child, plan)
        val projectUpdated = p.copy(modifiedFieldsObject, p.child)
        projectUpdated
      } else {
        p
      }
    }
  }

  private def checkConditions(fields: Seq[NamedExpression], child: LogicalPlan): Boolean = {
    // compare UDFs computation cost and return Boolean
    val needsOptimization = listHaveTwoUDFsEnabledForOptimization(fields)
    if (needsOptimization) println(fields.mkString(" | "))
    needsOptimization
  }

  private def listHaveTwoUDFsEnabledForOptimization(fields: Seq[NamedExpression]): Boolean = {
    // a simple priority order based on UDF name suffix
    val myPriorityList = fields.map((e) => {
      if (e.name.toString().startsWith("udf")) {
        Integer.parseInt(e.name.toString().split("_")(1))
      } else {
        0
      }
    }).filter(e => e > 0)

    // Do UDF with less cost before, so I need change the fields order
    myPriorityList.size == 2 && myPriorityList(0) > myPriorityList(1)
  }

  private def optimizePlan(fields: Seq[NamedExpression],
    child: LogicalPlan,
    plan: LogicalPlan): Seq[NamedExpression] = {
    // change order on field list. Return LogicalPlan modified
    val myListWithUDF = fields.filter((e) =>  e.name.toString().startsWith("udf"))
    if (myListWithUDF.size != 2) {
      throw new UnsupportedOperationException(
        s"The size of UDF list have ${myListWithUDF.size} elements.")
    }
    val myModifiedList: Seq[NamedExpression] = Seq(myListWithUDF(1), myListWithUDF(0))
    val myListWithoutUDF = fields.filter((e) =>  !e.name.toString().startsWith("udf"))
    val modifiedFielsObject = getFieldsReordered(myListWithoutUDF, myModifiedList)
    val msg = "•••• optimizePlan called : " + fields.size + " columns on Project.\n" +
      "•••• fields: " + fields.mkString(" | ") + "\n" +
      "•••• UDFs to reorder:\n" + myListWithUDF.mkString(" | ") + "\n" +
      "•••• field list Without UDF: " + myListWithoutUDF.mkString(" | ") + "\n" +
      "•••• modifiedFielsObject: " + modifiedFielsObject.mkString(" | ") + "\n"
    modifiedFielsObject
  }

  private def getFieldsReordered(fieldsWithoutUDFs: Seq[NamedExpression],
    fieldsWithUDFs: Seq[NamedExpression]): Seq[NamedExpression] = {
    fieldsWithoutUDFs.union(fieldsWithUDFs)
  }
}

case class R0(x: Int,
  p: Option[Int] = Some((new scala.util.Random).nextInt(999)),
  q: Option[Int] = Some((new scala.util.Random).nextInt(999))
)

object App {
  def main(args : Array[String]) {
    type ExtensionsBuilder = SparkSessionExtensions => Unit
    // inject the rule here
    val f: ExtensionsBuilder = { e =>
      e.injectResolutionRule(ReorderColumnsOnProjectOptimizationRule)
    }

    val spark = SparkSession
      .builder()
      .withExtensions(f)
      .getOrCreate()

    def createDsR0(spark: SparkSession): Dataset[R0] = {
      import spark.implicits._
      val ds = spark.range(3)
      val xdsR0 = ds.map((i) => {
        R0(i.intValue() + 1)
      })
      // IMPORTANT: The cache here is mandatory
      xdsR0.cache()
    }

    val dsR0 = createDsR0(spark)
    val udfA_99 = (p: Int) => Math.cos(p * p)  // higher cost Function
    val udfB_10 = (q: Int) => q + 1            // lower cost Function

    println("*** I' going to register my UDF ***")
    spark.udf.register("myUdfA", udfA_99)
    spark.udf.register("myUdfB", udfB_10)

    val dsR1 = {
      val ret1DS = dsR0.selectExpr("x", "p", "q", "myUdfA(p) as udfA_99")
      val result = ret1DS.cache()
      dsR0.show()
      result.show()

      result
    }

    val dsR2 = {
      val ret2DS = dsR1.selectExpr("x", "p", "q", "udfA_99", "myUdfB(p) as udfB_10")
      val result = ret2DS.cache()
      dsR0.show()
      dsR1.show()
      result.show()

      result
    }
  }
}

it will print

+---+---+---+-------+-------------------+
|  x|  p|  q|udfB_10|            udfA_99|
+---+---+---+-------+-------------------+
|  1|392|746|    393|-0.7508388993643841|
|  2|778|582|    779| 0.9310990915956336|
|  3|661| 34|    662| 0.6523545972748773|
+---+---+---+-------+-------------------+
like image 138
Panos Avatar answered Nov 09 '22 13:11

Panos


I believe the answer to this question is the same as this one.

The summary is that the optimizer is not supposed to alter the schema of the output as it's cached after the analysis.

I'll quote the accepted answer here as it comes from Michael Armbrust, the lead developer of the Spark SQL project at Databricks:

As you guessed, this is failing to work because we make assumptions that the optimizer will not change the results of the query.

Specifically, we cache the schema that comes out of the analyzer (and assume the optimizer does not change it). When translating rows to the external format, we use this schema and thus are truncating the columns in the result. If you did more than truncate (i.e. changed datatypes) this might even crash.

As you can see in this notebook, it is in fact producing the result you would expect under the covers. We are planning to open up more hooks at some point in the near future that would let you modify the plan at other phases of query execution. See SPARK-18127 for more details.

like image 44
stefanobaghino Avatar answered Nov 09 '22 12:11

stefanobaghino