Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using regexp to join two dataframes in spark

Say I have a dataframe df1 with the column "color" that contains a bunch of colors, and another dataframe df2 with column "phrase" that contains various phrases.

I'd like to join the two dataframes where the color in d1 appears in phrases in d2. I cannot use d1.join(d2, d2("phrases").contains(d1("color")), since it would join on anywhere the word appears within the phrase. I don't want to match on words like scaRED for example, where RED is a part of another word. I only want to join when the color appears as a seperate word in the phrases.

Can I use a regular expression to solve this? What function can I use and how is the syntax when I need to reference the column in the expression?

like image 280
martinr Avatar asked Sep 23 '20 18:09

martinr


3 Answers

You could create a REGEX pattern that checks for word boundaries (\b) when matching colors and use a regexp_replace check as the join condition:

val df1 = Seq(
  (1, "red"), (2, "green"), (3, "blue")
).toDF("id", "color")

val df2 = Seq(
  "red apple", "scared cat", "blue sky", "green hornet"
).toDF("phrase")

val patternCol = concat(lit("\\b"), df1("color"), lit("\\b"))

df1.join(df2, regexp_replace(df2("phrase"), patternCol, lit("")) =!= df2("phrase")).
  show
// +---+-----+------------+
// | id|color|      phrase|
// +---+-----+------------+
// |  1|  red|   red apple|
// |  3| blue|    blue sky|
// |  2|green|green hornet|
// +---+-----+------------+

Note that "scared cat" would have been a match in the absence of the enclosed word boundaries.

like image 76
Leo C Avatar answered Oct 15 '22 12:10

Leo C


Building up on your own solution, you can also try this:

d1.join(d2, array_contains(split(d2("phrases"), " "), d1("color")))
like image 39
jrook Avatar answered Oct 15 '22 12:10

jrook


Did not see your data but this is a starter, with a little variation. No need for regex as far as I can see, but who knows:

// You need to do some parsing like stripping of . ? and may be lowercase or uppercase
// You did not provide an example on the JOIN

import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

val checkValue = udf { (array: WrappedArray[String], value: String) => array.iterator.map(_.toLowerCase).contains(value.toLowerCase() ) }

//Gen some data
val dfCompare = spark.sparkContext.parallelize(Seq("red", "blue", "gold", "cherry")).toDF("color")
val rdd = sc.parallelize( Array( (("red","hello how are you red",10)), (("blue", "I am fine but blue",20)), (("cherry", "you need to do some parsing and I like cherry",30)), (("thebluephantom", "you need to do some parsing and I like fanta",30)) ))
//rdd.collect
val df = rdd.toDF()
val df2 = df.withColumn("_4", split($"_2", " ")) 
df2.show(false)
dfCompare.show(false)
val res = df2.join(dfCompare, checkValue(df2("_4"), dfCompare("color")), "inner")
res.show(false)

returns:

+------+---------------------------------------------+---+--------------------------------------------------------+------+
|_1    |_2                                           |_3 |_4                                                      |color |
+------+---------------------------------------------+---+--------------------------------------------------------+------+
|red   |hello how are you red                        |10 |[hello, how, are, you, red]                             |red   |
|blue  |I am fine but blue                           |20 |[I, am, fine, but, blue]                                |blue  |
|cherry|you need to do some parsing and I like cherry|30 |[you, need, to, do, some, parsing, and, I, like, cherry]|cherry|
+------+---------------------------------------------+---+--------------------------------------------------------+------+
like image 38
thebluephantom Avatar answered Oct 15 '22 11:10

thebluephantom