Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert org.apache.spark.sql.ColumnName to string,Decimal type in Spark Scala?

I have a JSON like below

{"name":"method1","parameter1":"P1name","parameter2": 1.0}

I am loading my JSON file

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/te.txt") 
scala> df.show()
+-------+----------+----------+
|   name|parameter1|parameter2|
+-------+----------+----------+
|method1|    P1name|    1.0   |
+-------+----------+----------+

I have a function like below:

def method1(P1:String, P2:Double)={
     |  print(P1)
         print(P2)
     | }

I am calling my method1 based on column name after executing below code it should execute method1.

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)

But I am getting bellow error.

<console>:63: error: type mismatch;
 found   : org.apache.spark.sql.ColumnName
 required: String

Please let me know how to convert org.apache.spark.sql.ColumnName data type to String

like image 393
Sai Avatar asked Jan 30 '23 04:01

Sai


2 Answers

When you pass arguments as

method1($"parameter1",$"parameter2")

You are passing columns to the function and not primitive datatypes. So, I would suggest you to change your method1 and method2 as udf functions, if you want to apply primitive datatype manipulations inside functions. And udf functions would have to return a value for each row of the new column.

import org.apache.spark.sql.functions._
def method1 = udf((P1:String, P2:Double)=>{
  print(P1)
  print(P2)
  P1+P2
})

def method2 = udf((P1:String, P2:Double)=>{
  print(P1)
  print(P2)
  P1+P2
})

Then your withColumn api should work properly

df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)

Note: udf functions perform data serialization and deserialzation for changing the column dataTypes to be processed row-wise which would increase complexity and a lot of memory usages. spark functions should be used as much as possible

like image 109
Ramesh Maharjan Avatar answered Feb 16 '23 01:02

Ramesh Maharjan


You can try like this:

scala> def method1(P1:String, P2:Double): Int = {
     |   println(P1)
     |   println(P2)
     |   0
     | }

scala> def method2(P1:String, P2:Double): Int = {
     |   println(P1)
     |   println(P2)
     |   1
     | }

df.withColumn("methodCalling", when($"name" === "method1", method1(df.select($"parameter1").map(_.getString(0)).collect.head,df.select($"parameter2").map(_.getDouble(0)).collect.head))
  .otherwise(when($"name" === "method2", method2(df.select($"parameter1").map(_.getString(0)).collect.head,df.select($"parameter2").map(_.getDouble(0)).collect.head)))).show

//output

P1name
1.0
+-------+----------+----------+-------------+
|   name|parameter1|parameter2|methodCalling|
+-------+----------+----------+-------------+
|method1|    P1name|       1.0|            0|
+-------+----------+----------+-------------+

You have to return something from your method otherwise it will retun unit and it will give error after printing result:

java.lang.RuntimeException: Unsupported literal type class scala.runtime.BoxedUnit ()
  at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:75)
  at org.apache.spark.sql.functions$.lit(functions.scala:101)
  at org.apache.spark.sql.functions$.when(functions.scala:1245)
  ... 50 elided

Thanks.

like image 25
Learner Avatar answered Feb 16 '23 02:02

Learner