I have a Scala code for computing a new column with using withcolumn function in my hardcoded code which looks something like this
combinedinputdf
.withColumn("AMGClassRule", when(col("CoreSectorLevel1Code") === "Derivatives" && col("CoreSectorLevel2Code") === "Caps" && col("FAS157Flavor") === "SPRD", "1")
//.when(col("CoreSectorLevel1Code") === "Derivatives" && col("CoreSectorLevel2Code") === "Caps" && col("FAS157Flavor") === "SPRD", "2")
.when(col("CoreSectorLevel1Code") === "Derivatives" && col("FAS157Flavor") === "TRSY" && col("DerivativeType") === "TROR", "3")
.when(col("InDefaultInd") === "Y" , "4")
This works as expected.
But I want to dynamically add or modify the when condition while executing based on table or CSV
second table looks like this 
so at runtime I can read this table into either dataframe or Map and iterate through rules of the table and assign a value to my output
How do I go about doing this dynamically?
It can be achieved dynamically by approaching the creation of when conditions in a somewhat abstract manner.
To make it more readable, let's create a dedicated wrapper object:
case class WhenCondition(valuesToCheck: Seq[(String, Any)], valueIfMatches: Any)
valuesToCheck field contains a sequence of tuples for all applicable conditions for the expression to generate. The first element of each tuple is the name of the column, the second is the value to match with. For example: ("CoreSectorLevel1Code", "Derivatives").
valueIfMatches corresponds to the second arg to pass to when: "1", "3" or "4" from the example.
We will need a function that reads conditions from the source table and returns a sequence of WhenCondition instances:
private def readConditions(args: Any*): Seq[WhenCondition] = {
??? // impl depends on how you read the source data
}
// for the example of the question, this function should return:
val conditions = Seq(
WhenCondition(Seq(("CoreSectorLevel1Code", "Derivatives"), ("CoreSectorLevel2Code", "Caps"), ("FAS157Flavor", "SPRD")), "1"),
WhenCondition(Seq(("CoreSectorLevel1Code", "Derivatives"), ("FAS157Flavor", "TRSY"), ("DerivativeType", "TROR")), "3"),
WhenCondition(Seq(("InDefaultInd", "Y")), "4")
)
Update
We'll also need a function that produces a chaining of when calls:
private def chainWhen(chained: Column, remaining: Seq[(Column, Any)]): Column =
remaining match {
case Nil => chained
case head :: tail => chainWhen(chained.when(head._1, head._2), tail)
}
Now we create a dynamicWhen function that transfroms the sequence of WhenCondition:
private def dynamicWhen(parsedConditions: Seq[WhenCondition]): Option[Column] = {
// first, we transform a WhenCondition object into a tuple of args (Column, Any) for the target "when" function
val conditions = parsedConditions.map(whenCondition => {
val condition = whenCondition.valuesToCheck
.map(cond => col(cond._1) === cond._2)
.reduce(_ && _)
(condition, whenCondition.valueIfMatches)
})
// if there weren't any input conditions, we return None, otherwise we chain the transformation and wrap it into Some
conditions match {
case Nil => None
case head :: tail => Some(chainWhen(when(head._1, head._2), tail))
}
}
And the original hard-coded call can be replaced with
// produce the optional chained dynamic "whens".
val whenConditions = dynamicWhen(conditions)
// map it in a DataFrame with a new column or keep the original one, if there were no "whens".
val result = whenConditions.map(cond => df.withColumn("AMGClassRule", cond))
.getOrElse(df)
Finally, a short test with faky data:
val df = Seq(
("Derivatives", "Caps", "SPRD", "x", "N"),
("Derivatives", "Caps", "SPRD", "TROR", "Y"),
("Derivatives", "Caps", "TRSY", "x", "Y"),
("Derivatives", "Caps", "TRSY", "TROR", "N"),
("Derivatives", "Caps", "zzzz", "x", "N")
).toDF("CoreSectorLevel1Code", "CoreSectorLevel2Code", "FAS157Flavor", "DerivativeType", "InDefaultInd")
val result = ... // transformations above
result.show(false)
+--------------------+--------------------+------------+--------------+------------+------------+
|CoreSectorLevel1Code|CoreSectorLevel2Code|FAS157Flavor|DerivativeType|InDefaultInd|AMGClassRule|
+--------------------+--------------------+------------+--------------+------------+------------+
|Derivatives |Caps |SPRD |x |N |1 |
|Derivatives |Caps |SPRD |TROR |Y |1 |
|Derivatives |Caps |TRSY |x |Y |4 |
|Derivatives |Caps |TRSY |TROR |N |3 |
|Derivatives |Caps |zzzz |x |N |null |
+--------------------+--------------------+------------+--------------+------------+------------+
End-of-update
Update 2
Example of reading of conditions from a DataFrame.
Supposing we have the following condition descriptions stored in a DataFrame:
val rulesDf = Seq(
("Derivatives", "%", "%", "16"),
("Derivatives", "Fx Options", "%", "17"),
("Derivatives", "Futures", "%", "48")
).toDF("CoreSectorLevel1Code", "CoreSectorLevel2Code", "FAS157Flavor", "rule")
rulesDf.show(false)
+--------------------+--------------------+------------+----+
|CoreSectorLevel1Code|CoreSectorLevel2Code|FAS157Flavor|rule|
+--------------------+--------------------+------------+----+
|Derivatives |% |% |16 |
|Derivatives |Fx Options |% |17 |
|Derivatives |Futures |% |48 |
+--------------------+--------------------+------------+----+
We can read and transform them into WhenCondition wrappers using the following:
private def readConditions(): Seq[WhenCondition] = {
val ruleColumnName = "rule"
val ruleColumnIndex = rulesDf.schema.fieldIndex(ruleColumnName)
val conditionColumns = rulesDf.schema.fieldNames.filter(_ != ruleColumnName).toSeq
rulesDf.rdd.map(row => {
val valuesToCheck = conditionColumns.map(colName => (colName, row.get(row.fieldIndex(colName))))
val rule = row(ruleColumnIndex)
WhenCondition(valuesToCheck, rule)
}).collect().toSeq
}
readConditions().foreach(println)
// outputs:
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,%), (FAS157Flavor,%)),16)
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,Fx Options), (FAS157Flavor,%)),17)
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,Futures), (FAS157Flavor,%)),48)
End-of-update 2
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With