Say I have a dataframe df1 with the column "color" that contains a bunch of colors, and another dataframe df2 with column "phrase" that contains various phrases.
I'd like to join the two dataframes where the color in d1 appears in phrases in d2. I cannot use d1.join(d2, d2("phrases").contains(d1("color"))
, since it would join on anywhere the word appears within the phrase. I don't want to match on words like scaRED for example, where RED is a part of another word. I only want to join when the color appears as a seperate word in the phrases.
Can I use a regular expression to solve this? What function can I use and how is the syntax when I need to reference the column in the expression?
You could create a REGEX pattern that checks for word boundaries (\b
) when matching colors
and use a regexp_replace
check as the join
condition:
val df1 = Seq(
(1, "red"), (2, "green"), (3, "blue")
).toDF("id", "color")
val df2 = Seq(
"red apple", "scared cat", "blue sky", "green hornet"
).toDF("phrase")
val patternCol = concat(lit("\\b"), df1("color"), lit("\\b"))
df1.join(df2, regexp_replace(df2("phrase"), patternCol, lit("")) =!= df2("phrase")).
show
// +---+-----+------------+
// | id|color| phrase|
// +---+-----+------------+
// | 1| red| red apple|
// | 3| blue| blue sky|
// | 2|green|green hornet|
// +---+-----+------------+
Note that "scared cat" would have been a match in the absence of the enclosed word boundaries.
Building up on your own solution, you can also try this:
d1.join(d2, array_contains(split(d2("phrases"), " "), d1("color")))
Did not see your data but this is a starter, with a little variation. No need for regex as far as I can see, but who knows:
// You need to do some parsing like stripping of . ? and may be lowercase or uppercase
// You did not provide an example on the JOIN
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
val checkValue = udf { (array: WrappedArray[String], value: String) => array.iterator.map(_.toLowerCase).contains(value.toLowerCase() ) }
//Gen some data
val dfCompare = spark.sparkContext.parallelize(Seq("red", "blue", "gold", "cherry")).toDF("color")
val rdd = sc.parallelize( Array( (("red","hello how are you red",10)), (("blue", "I am fine but blue",20)), (("cherry", "you need to do some parsing and I like cherry",30)), (("thebluephantom", "you need to do some parsing and I like fanta",30)) ))
//rdd.collect
val df = rdd.toDF()
val df2 = df.withColumn("_4", split($"_2", " "))
df2.show(false)
dfCompare.show(false)
val res = df2.join(dfCompare, checkValue(df2("_4"), dfCompare("color")), "inner")
res.show(false)
returns:
+------+---------------------------------------------+---+--------------------------------------------------------+------+
|_1 |_2 |_3 |_4 |color |
+------+---------------------------------------------+---+--------------------------------------------------------+------+
|red |hello how are you red |10 |[hello, how, are, you, red] |red |
|blue |I am fine but blue |20 |[I, am, fine, but, blue] |blue |
|cherry|you need to do some parsing and I like cherry|30 |[you, need, to, do, some, parsing, and, I, like, cherry]|cherry|
+------+---------------------------------------------+---+--------------------------------------------------------+------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With