I have a dataframe with transactions with a joined price list:
+----------+----------+------+-------+-------+
| paid | currency | EUR | USD | GBP |
+----------+----------+------+-------+-------+
| 49.5 | EUR | 99 | 79 | 69 |
+----------+----------+------+-------+-------+
A customer has paid 49.5 in EUR, as shown in the "currency" column. I now want to compare that paid price with the price from the price list.
Therefor I need to access the correct column based on the value of "currency" like so:
df.withColumn("saved", df.col(df.col($"currency")) - df.col("paid"))
which I hoped would become
df.withColumn("saved", df.col("EUR") - df.col("paid"))
This fails, however. I tried all things I could image, including and UDF, getting nowhere.
I guess there is some elegant solution for this? Can somebody help out here?
Assuming that the column names match values in the currency
column:
import org.apache.spark.sql.functions.{lit, col, coalesce}
import org.apache.spark.sql.Column
// Dummy data
val df = sc.parallelize(Seq(
(49.5, "EUR", 99, 79, 69), (100.0, "GBP", 80, 120, 50)
)).toDF("paid", "currency", "EUR", "USD", "GBP")
// A list of available currencies
val currencies: List[String] = List("EUR", "USD", "GBP")
// Select listed value
val listedPrice: Column = coalesce(
currencies.map(c => when($"currency" === c, col(c)).otherwise(lit(null))): _*)
df.select($"*", (listedPrice - $"paid").alias("difference")).show
// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5| EUR| 99| 79| 69| 49.5|
// |100.0| GBP| 80|120| 50| -50.0|
// +-----+--------+---+---+---+----------+
with SQL equivalent of listedPrice
expression being something like this:
COALESCE(
CASE WHEN (currency = 'EUR') THEN EUR ELSE null,
CASE WHEN (currency = 'USD') THEN USD ELSE null,
CASE WHEN (currency = 'GBP') THEN GBP ELSE null
)
Alternative using foldLeft
:
import org.apache.spark.sql.functions.when
val listedPriceViaFold = currencies.foldLeft(
lit(null))((acc, c) => when($"currency" === c, col(c)).otherwise(acc))
df.select($"*", (listedPriceViaFold - $"paid").alias("difference")).show
// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5| EUR| 99| 79| 69| 49.5|
// |100.0| GBP| 80|120| 50| -50.0|
// +-----+--------+---+---+---+----------+
where listedPriceViaFold
translates to following SQL:
CASE
WHEN (currency = 'GBP') THEN GBP
ELSE CASE
WHEN (currency = 'USD') THEN USD
ELSE CASE
WHEN (currency = 'EUR') THEN EUR
ELSE null
Unfortunately I am not aware of any built-in functions which could express directly SQL like this
CASE currency
WHEN 'EUR' THEN EUR
WHEN 'USD' THEN USD
WHEN 'GBP' THEN GBP
ELSE null
END
but you can use this construct in raw SQL.
It my assumption is not true you can simply add mapping between column name and a value in the currency
column.
Edit:
Another option, which could be efficient if source supports predicate pushdown and efficient column pruning, is to subset by currency and union:
currencies.map(
// for each currency filter and add difference
c => df.where($"currency" === c).withColumn("difference", $"paid" - col(c))
).reduce((df1, df2) => df1.unionAll(df2)) // Union
It is equivalent to SQL like this:
SELECT *, EUR - paid AS difference FROM df WHERE currency = 'EUR'
UNION ALL
SELECT *, USD - paid AS difference FROM df WHERE currency = 'USD'
UNION ALL
SELECT *, GBP - paid AS difference FROM df WHERE currency = 'GBP'
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With