Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark udf initialization

I want to create a customized regex-based UDF in Spark SQL. My preference would be to create a memory-resident

 Map[String,Pattern]

Where Pattern refers to a compiled regex version of the string key. But to do this we need to put the map creation into an "initialize" function of the UDF.

So are there any structures for a Spark udf supporting persistent state across invocations (via Spark SQL) ?

Note that HIVE does support the lifecycle of UDF's. I used that to generated Parse Trees as part of the initialization so that actual invocation of the UDF was against lightning fast trees with no parsing involved.

like image 806
WestCoastProjects Avatar asked Dec 14 '22 10:12

WestCoastProjects


1 Answers

Lets start with imports and some dummy data:

import org.apache.spark.sql.functions.udf
import scala.util.matching.Regex
import java.util.regex.Pattern

val df = sc.parallelize(Seq(
  ("foo", "this is bar"), ("foo", "this is foo"),
  ("bar", "foobar"), ("bar", "foo and foo")
)).toDF("type", "value")

and map:

val patterns: Map[String, Pattern] = Seq(("foo", ".*foo.*"), ("bar", ".*bar.*"))
   .map{case (k, v) => (k, new Regex(v).pattern)}
   .toMap

Now I see two different options:

  • make patterns a broadcast variable referenced inside udf

    val patternsBd = sc.broadcast(patterns)
    
    val typeMatchedViaBroadcast = udf((t: String, v: String) =>
      patternsBd.value.get(t).map(m => m.matcher(v).matches))
    
    df.withColumn("match", typeMatchedViaBroadcast($"type", $"value")).show
    
    // +----+-----------+-----+
    // |type|      value|match|
    // +----+-----------+-----+
    // | foo|this is bar|false|
    // | foo|this is foo| true|
    // | bar|     foobar| true|
    // | bar|foo and foo|false|
    // +----+-----------+-----+
    
  • passing map inside closure

    def makeTypeMatchedViaClosure(patterns: Map[String, Pattern]) = udf(
      (t: String, v: String) => patterns.get(t).map(m => m.matcher(v).matches))
    
    val typeMatchedViaClosure = makeTypeMatchedViaClosure(patterns)
    
    df.withColumn("match", typeMatchedViaClosure($"type", $"value")).show
    
    // +----+-----------+-----+
    // |type|      value|match|
    // +----+-----------+-----+
    // | foo|this is bar|false|
    // | foo|this is foo| true|
    // | bar|     foobar| true|
    // | bar|foo and foo|false|
    // +----+-----------+-----+
    
like image 64
zero323 Avatar answered Jan 09 '23 11:01

zero323