I have a JSON like below
{"name":"method1","parameter1":"P1name","parameter2": 1.0}
I am loading my JSON file
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/te.txt")
scala> df.show()
+-------+----------+----------+
| name|parameter1|parameter2|
+-------+----------+----------+
|method1| P1name| 1.0 |
+-------+----------+----------+
I have a function like below:
def method1(P1:String, P2:Double)={
| print(P1)
print(P2)
| }
I am calling my method1 based on column name after executing below code it should execute method1.
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)
But I am getting bellow error.
<console>:63: error: type mismatch;
found : org.apache.spark.sql.ColumnName
required: String
Please let me know how to convert org.apache.spark.sql.ColumnName data type to String
When you pass arguments as
method1($"parameter1",$"parameter2")
You are passing columns to the function and not primitive datatypes. So, I would suggest you to change your method1
and method2
as udf
functions, if you want to apply primitive datatype manipulations inside functions. And udf
functions would have to return a value for each row of the new column.
import org.apache.spark.sql.functions._
def method1 = udf((P1:String, P2:Double)=>{
print(P1)
print(P2)
P1+P2
})
def method2 = udf((P1:String, P2:Double)=>{
print(P1)
print(P2)
P1+P2
})
Then your withColumn
api should work properly
df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)
Note: udf functions perform data serialization and deserialzation for changing the column dataTypes to be processed row-wise which would increase complexity and a lot of memory usages. spark functions should be used as much as possible
You can try like this:
scala> def method1(P1:String, P2:Double): Int = {
| println(P1)
| println(P2)
| 0
| }
scala> def method2(P1:String, P2:Double): Int = {
| println(P1)
| println(P2)
| 1
| }
df.withColumn("methodCalling", when($"name" === "method1", method1(df.select($"parameter1").map(_.getString(0)).collect.head,df.select($"parameter2").map(_.getDouble(0)).collect.head))
.otherwise(when($"name" === "method2", method2(df.select($"parameter1").map(_.getString(0)).collect.head,df.select($"parameter2").map(_.getDouble(0)).collect.head)))).show
//output
P1name
1.0
+-------+----------+----------+-------------+
| name|parameter1|parameter2|methodCalling|
+-------+----------+----------+-------------+
|method1| P1name| 1.0| 0|
+-------+----------+----------+-------------+
You have to return something from your method otherwise it will retun unit and it will give error after printing result:
java.lang.RuntimeException: Unsupported literal type class scala.runtime.BoxedUnit ()
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at org.apache.spark.sql.functions$.when(functions.scala:1245)
... 50 elided
Thanks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With