Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use a broadcast collection in a udf?

How to use a broadcast collection in Spark SQL 1.6.1 udf. Udf should be called from Main SQL as shown below

sqlContext.sql("""Select col1,col2,udf_1(key) as value_from_udf FROM table_a""")

udf_1() should be looking through a broadcast small collection to return value to main sql.

like image 528
S. K Avatar asked Nov 18 '16 09:11

S. K


People also ask

How do you use broadcast variable in Spark UDF?

The broadcast variable is a wrapper around v , and its value can be accessed by calling the Value() method. Example: string v = "Variable to be broadcasted"; Broadcast<string> bv = SparkContext. Broadcast(v); // Using the broadcast variable in a UDF: Func<Column, Column> udf = Udf<string, string>( str => $"{str}: {bv.

Why UDF are not recommended in Spark?

1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer. Let's consider an example of a general optimization when reading data from Database or columnar format files such as Parquet is PredicatePushdown.

How do you broadcast a function in PySpark?

The PySpark Broadcast variable is created using the "broadcast(v)" method of SparkContext class. This method takes argument "v" that is to be broadcasted.

How do I get broadcast variable in Spark?

The variable "power" is declared as BroadCast Variable by using the broadcast() function. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task.


1 Answers

Here's a minimal reproducible example in pySpark, illustrating the use of broadcast variables to perform lookups, employing a lambda function as an UDF inside a SQL statement.

# Create dummy data and register as table
df = sc.parallelize([
    (1,"a"),
    (2,"b"),
    (3,"c")]).toDF(["num","let"])
df.registerTempTable('table')

# Create broadcast variable from local dictionary
myDict = {1: "y", 2: "x", 3: "z"}
broadcastVar = sc.broadcast(myDict) 
# Alternatively, if your dict is a key-value rdd, 
# you can do sc.broadcast(rddDict.collectAsMap())

# Create lookup function and apply it
sqlContext.registerFunction("lookup", lambda x: broadcastVar.value.get(x))
sqlContext.sql('select num, let, lookup(num) as test from table').show()
+---+---+----+
|num|let|test|
+---+---+----+
|  1|  a|   y|
|  2|  b|   x|
|  3|  c|   z|
+---+---+----+
like image 197
mtoto Avatar answered Dec 08 '22 06:12

mtoto