Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Passing two columns to a udf in scala?

I have a dataframe containing two columns,one is data and the other column is character count in that data field.

Data    Count
Hello   5
How     3
World   5

I want to change value of column data based on the value in count column. How can this be achieved? I tried this using an udf :

invalidrecords.withColumn("value",appendDelimiterError(invalidrecords("value"),invalidrecords("a_cnt")))

This seems to fail, is this the correct way to do it?

like image 766
Osy Avatar asked Jul 07 '17 12:07

Osy


People also ask

How do I use UDF in spark with Scala?

For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf () function or register it as udf to use it on DataFrame and SQL respectively. Why do we need a Spark UDF? UDF’s are used to extend the functions of the framework and re-use this function on several DataFrame.

How to pass the value of the column to the UDF?

val df4 = df.withColumn ("df4", isAlienNameUDF2 (when (col ("alien-name"). In this case, we check the value of the column. If the value is not null, we pass the value of the column. Otherwise, we pass a default value to the UDF. In option C, irrespective of the value of the column, we are invoking the UDF.

Can I use UDF’s on a Dataframe?

Once UDF’s are created they can be used on DataFrame and SQL (after registering). SparkByExamples.com is a Big Data and Spark examples community page, all examples are simple and easy to understand and well tested in our development environment Read more ..

How do I use snowpark in Scala?

You can call Snowpark APIs to create user-defined functions (UDFs) for your custom lambdas and functions in Scala, and you can call these UDFs to process the data in your DataFrame. When you use the Snowpark API to create an UDF, the Snowpark library serializes and uploads the code for your UDF to an internal stage.


1 Answers

Here's an easy way of doing it

first you create a dataframe

import sqlContext.implicits._
val invalidrecords = Seq(
  ("Hello", 5),
  ("How", 3),
  ("World", 5)
).toDF("Data", "Count")

you should have

+-----+-----+
|Data |Count|
+-----+-----+
|Hello|5    |
|How  |3    |
|World|5    |
+-----+-----+

Then you define udf function as

import org.apache.spark.sql.functions._
def appendDelimiterError = udf((data: String, count: Int) => "value with error" )

And you call using withColumn as

invalidrecords.withColumn("value",appendDelimiterError(invalidrecords("Data"),invalidrecords("Count"))).show(false)

You should have output as

+-----+-----+----------------+
|Data |Count|value           |
+-----+-----+----------------+
|Hello|5    |value with error|
|How  |3    |value with error|
|World|5    |value with error|
+-----+-----+----------------+

You can write your logic instead of returning a string from udf function

Edited

Answering your requirements in the comment below would require you to change the udf function and withColumn as below

def appendDelimiterError = udf((data: String, count: Int) => {
  if(count < 5) s"convert value to ${data} - error"
  else data
} )

invalidrecords.withColumn("Data",appendDelimiterError(invalidrecords("Data"),invalidrecords("Count"))).show(false)

you should have output as

+----------------------------+-----+
|Data                        |Count|
+----------------------------+-----+
|Hello                       |5    |
|convert value to How - error|3    |
|World                       |5    |
+----------------------------+-----+
like image 115
Ramesh Maharjan Avatar answered Oct 22 '22 11:10

Ramesh Maharjan