Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

dataframe with dynamic when condition using with column

I have a Scala code for computing a new column with using withcolumn function in my hardcoded code which looks something like this

 combinedinputdf
  .withColumn("AMGClassRule", when(col("CoreSectorLevel1Code") === "Derivatives" && col("CoreSectorLevel2Code") === "Caps" && col("FAS157Flavor") === "SPRD", "1")
      //.when(col("CoreSectorLevel1Code") === "Derivatives" && col("CoreSectorLevel2Code") === "Caps" && col("FAS157Flavor") === "SPRD", "2")
      .when(col("CoreSectorLevel1Code") === "Derivatives" && col("FAS157Flavor") === "TRSY" && col("DerivativeType") === "TROR", "3")
      .when(col("InDefaultInd") === "Y" , "4")

This works as expected.

But I want to dynamically add or modify the when condition while executing based on table or CSV

second table looks like this Tables with rules

so at runtime I can read this table into either dataframe or Map and iterate through rules of the table and assign a value to my output

How do I go about doing this dynamically?

like image 346
Aaron Avatar asked Jun 13 '26 14:06

Aaron


1 Answers

It can be achieved dynamically by approaching the creation of when conditions in a somewhat abstract manner.

To make it more readable, let's create a dedicated wrapper object:

case class WhenCondition(valuesToCheck: Seq[(String, Any)], valueIfMatches: Any)

valuesToCheck field contains a sequence of tuples for all applicable conditions for the expression to generate. The first element of each tuple is the name of the column, the second is the value to match with. For example: ("CoreSectorLevel1Code", "Derivatives").

valueIfMatches corresponds to the second arg to pass to when: "1", "3" or "4" from the example.

We will need a function that reads conditions from the source table and returns a sequence of WhenCondition instances:

private def readConditions(args: Any*): Seq[WhenCondition] = {
  ??? // impl depends on how you read the source data
}

// for the example of the question, this function should return:
val conditions = Seq(
  WhenCondition(Seq(("CoreSectorLevel1Code", "Derivatives"), ("CoreSectorLevel2Code", "Caps"), ("FAS157Flavor", "SPRD")), "1"),
  WhenCondition(Seq(("CoreSectorLevel1Code", "Derivatives"), ("FAS157Flavor", "TRSY"), ("DerivativeType", "TROR")), "3"),
  WhenCondition(Seq(("InDefaultInd", "Y")), "4")
)

Update

We'll also need a function that produces a chaining of when calls:

private def chainWhen(chained: Column, remaining: Seq[(Column, Any)]): Column =
  remaining match {
    case Nil => chained
    case head :: tail => chainWhen(chained.when(head._1, head._2), tail)
  }

Now we create a dynamicWhen function that transfroms the sequence of WhenCondition:

private def dynamicWhen(parsedConditions: Seq[WhenCondition]): Option[Column] = {
  // first, we transform a WhenCondition object into a tuple of args (Column, Any) for the target "when" function
  val conditions = parsedConditions.map(whenCondition => {
    val condition = whenCondition.valuesToCheck
                                 .map(cond => col(cond._1) === cond._2)
                                 .reduce(_ && _)
    (condition, whenCondition.valueIfMatches)
  })
  // if there weren't any input conditions, we return None, otherwise we chain the transformation and wrap it into Some
  conditions match {
    case Nil => None
    case head :: tail => Some(chainWhen(when(head._1, head._2), tail))
  }
}

And the original hard-coded call can be replaced with

// produce the optional chained dynamic "whens".
val whenConditions = dynamicWhen(conditions)

// map it in a DataFrame with a new column or keep the original one, if there were no "whens".
val result = whenConditions.map(cond => df.withColumn("AMGClassRule", cond))
                           .getOrElse(df)

Finally, a short test with faky data:

val df = Seq(
  ("Derivatives", "Caps", "SPRD", "x", "N"),
  ("Derivatives", "Caps", "SPRD", "TROR", "Y"),
  ("Derivatives", "Caps", "TRSY", "x", "Y"),
  ("Derivatives", "Caps", "TRSY", "TROR", "N"),
  ("Derivatives", "Caps", "zzzz", "x", "N")
).toDF("CoreSectorLevel1Code", "CoreSectorLevel2Code", "FAS157Flavor", "DerivativeType", "InDefaultInd")

val result = ... // transformations above
result.show(false)

+--------------------+--------------------+------------+--------------+------------+------------+
|CoreSectorLevel1Code|CoreSectorLevel2Code|FAS157Flavor|DerivativeType|InDefaultInd|AMGClassRule|
+--------------------+--------------------+------------+--------------+------------+------------+
|Derivatives         |Caps                |SPRD        |x             |N           |1           |
|Derivatives         |Caps                |SPRD        |TROR          |Y           |1           |
|Derivatives         |Caps                |TRSY        |x             |Y           |4           |
|Derivatives         |Caps                |TRSY        |TROR          |N           |3           |
|Derivatives         |Caps                |zzzz        |x             |N           |null        |
+--------------------+--------------------+------------+--------------+------------+------------+

End-of-update

Update 2

Example of reading of conditions from a DataFrame.

Supposing we have the following condition descriptions stored in a DataFrame:

val rulesDf = Seq(
  ("Derivatives", "%", "%", "16"),
  ("Derivatives", "Fx Options", "%", "17"),
  ("Derivatives", "Futures", "%", "48")
).toDF("CoreSectorLevel1Code", "CoreSectorLevel2Code", "FAS157Flavor", "rule")

rulesDf.show(false)

+--------------------+--------------------+------------+----+
|CoreSectorLevel1Code|CoreSectorLevel2Code|FAS157Flavor|rule|
+--------------------+--------------------+------------+----+
|Derivatives         |%                   |%           |16  |
|Derivatives         |Fx Options          |%           |17  |
|Derivatives         |Futures             |%           |48  |
+--------------------+--------------------+------------+----+

We can read and transform them into WhenCondition wrappers using the following:

private def readConditions(): Seq[WhenCondition] = {
  val ruleColumnName = "rule"
  val ruleColumnIndex = rulesDf.schema.fieldIndex(ruleColumnName)
  val conditionColumns = rulesDf.schema.fieldNames.filter(_ != ruleColumnName).toSeq
  rulesDf.rdd.map(row => {
    val valuesToCheck = conditionColumns.map(colName => (colName, row.get(row.fieldIndex(colName))))
    val rule = row(ruleColumnIndex)
    WhenCondition(valuesToCheck, rule)
  }).collect().toSeq
}

readConditions().foreach(println)

// outputs:
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,%), (FAS157Flavor,%)),16)
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,Fx Options), (FAS157Flavor,%)),17)
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,Futures), (FAS157Flavor,%)),48)

End-of-update 2

like image 93
Antot Avatar answered Jun 15 '26 05:06

Antot



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!