I was thinking if it was possible to create an UDF
that receives two arguments a Column
and another variable (Object
,Dictionary
, or any other type), then do some operations and return the result.
Actually, I attempted to do this but I got an exception. Therefore, I was wondering if there was any way to avoid this problem.
df = sqlContext.createDataFrame([("Bonsanto", 20, 2000.00),
("Hayek", 60, 3000.00),
("Mises", 60, 1000.0)],
["name", "age", "balance"])
comparatorUDF = udf(lambda c, n: c == n, BooleanType())
df.where(comparatorUDF(col("name"), "Bonsanto")).show()
And I get the following error:
AnalysisException: u"cannot resolve 'Bonsanto' given input columns name, age, balance;"
So it's obvious that the UDF
"sees" the string
"Bonsanto" as a column name, and actually I'm trying to compare a record value with the second argument.
On the other hand, I know that it's possible to use some operators inside a where
clause (but actually I want to know if it is achievable using an UDF
), as follows:
df.where(col("name") == "Bonsanto").show()
#+--------+---+-------+
#| name|age|balance|
#+--------+---+-------+
#|Bonsanto| 20| 2000.0|
#+--------+---+-------+
Add New Column with Constant Value In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .
1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer. Let's consider an example of a general optimization when reading data from Database or columnar format files such as Parquet is PredicatePushdown.
A UDF can only work on records that could in the most broader case be an entire DataFrame if the UDF is a user-defined aggregate function (UDAF). If you want to work on more than one DataFrame in a UDF you have to join the DataFrames to have the columns you want to use for the UDF.
Everything that is passed to an UDF is interpreted as a column / column name. If you want to pass a literal you have two options:
Pass argument using currying:
def comparatorUDF(n):
return udf(lambda c: c == n, BooleanType())
df.where(comparatorUDF("Bonsanto")(col("name")))
This can be used with an argument of any type as long as it is serializable.
Use a SQL literal and the current implementation:
from pyspark.sql.functions import lit
df.where(comparatorUDF(col("name"), lit("Bonsanto")))
This works only with supported types (strings, numerics, booleans). For non-atomic types see How to add a constant column in a Spark DataFrame?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With