Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark how to use a UDF with a Join

I'd like to use a specific UDF with using Spark

Here's the plan:

I have a table A(10 million rows) and a table B(15 millions rows)

I'd like to use an UDF comparing one element of the table A and one of the table B Is it possible

Here's a a sample of my code. At some point i also need to say that my UDF compare must be greater than 0,9:

DataFrame dfr = df
                .select("name", "firstname", "adress1", "city1","compare(adress1,adress2)")
                .join(dfa,df.col("adress1").equalTo(dfa.col("adress2"))
                        .and((df.col("city1").equalTo(dfa.col("city2"))
                                ...;

Is it possible ?

like image 689
Jean Avatar asked Aug 16 '17 16:08

Jean


People also ask

Why UDF is not recommended in Spark?

1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer. Let's consider an example of a general optimization when reading data from Database or columnar format files such as Parquet is PredicatePushdown.

Can I use UDF in Spark SQL?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.

What is Leftanti join in Spark?

The left anti join in PySpark is similar to the join functionality, but it returns only columns from the left DataFrame for non-matched records.

Is join an action in Spark?

Join is supposed to be a transformation, not an action.


1 Answers

Yes, you can. However it will be slower than normal operators, as Spark will be not able to do predicate pushdown

Example:

val udf = udf((x : String, y : String) => { here compute similarity; });
val df3 = df1.join(df2, udf(df1.field1, df2.field1) > 0.9)

For example:

val df1 = Seq (1, 2, 3, 4).toDF("x")
val df2 = Seq(1, 3, 7, 11).toDF("q")
val udf = org.apache.spark.sql.functions.udf((x : Int, q : Int) => { Math.abs(x - q); });
val df3 = df1.join(df2, udf(df1("x"), df2("q")) > 1)

You can also directly return boolean from User Defined Function

like image 52
T. Gawęda Avatar answered Sep 22 '22 15:09

T. Gawęda