I have two RDDs where the first RDD has records of the form
RDD1 = (1, 2017-2-13,"ABX-3354 gsfette"
2, 2017-3-18,"TYET-3423 asdsad"
3, 2017-2-09,"TYET-3423 rewriu"
4, 2017-2-13,"ABX-3354 42324"
5, 2017-4-01,"TYET-3423 aerr")
and the second RDD has records of the form
RDD2 = ('mfr1',"ABX-3354")
('mfr2',"TYET-3423")
I need to find all the records in RDD1 which have a full match/partial match for each value in RDD2 matching the 3rd Column of RDD1 to 2nd column of RDD2 and get the count
For this example, the end result would be:
ABX-3354 2
TYET-3423 3
What is the best way to do this?
I am posting couple of solutions with Spark SQL and more focused towards accurate pattern matching of search string in given text.
import spark.implicits._
val df1 = Seq(
(1, "2017-2-13", "ABX-3354 gsfette"),
(2, "2017-3-18", "TYET-3423 asdsad"),
(3, "2017-2-09", "TYET-3423 rewriu"),
(4, "2017-2-13", "ABX-335442324"), //changed from "ABX-3354 42324"
(5, "2017-4-01", "aerrTYET-3423") //changed from "TYET-3423 aerr"
).toDF("id", "dt", "txt")
val df2 = Seq(
("mfr1", "ABX-3354"),
("mfr2", "TYET-3423")
).toDF("col1", "key")
//match function for filter
def matcher(row: Row): Boolean = row.getAs[String]("txt")
.contains(row.getAs[String]("key"))
val join = df1.crossJoin(df2)
import org.apache.spark.sql.functions.count
val result = join.filter(matcher _)
.groupBy("key")
.agg(count("txt").as("count"))
import spark.implicits._
val df1 = Seq(
(1, "2017-2-13", "ABX-3354 gsfette"),
(2, "2017-3-18", "TYET-3423 asdsad"),
(3, "2017-2-09", "TYET-3423 rewriu"),
(4, "2017-2-13", "ABX-3354 42324"),
(5, "2017-4-01", "aerrTYET-3423"),
(6, "2017-4-01", "aerrYET-3423")
).toDF("id", "dt", "pattern")
//small dataset to broadcast
val df2 = Seq(
("mfr1", "ABX-3354"),
("mfr2", "TYET-3423")
).map(_._2) // considering only 2 values in pair
//Lookup to use in UDF
val lookup = spark.sparkContext.broadcast(df2)
//Udf
import org.apache.spark.sql.functions._
val matcher = udf((txt: String) => {
val matches: Seq[String] = lookup.value.filter(txt.contains(_))
if (matches.size > 0) matches.head else null
})
val result = df1.withColumn("match", matcher($"pattern"))
.filter($"match".isNotNull) // not interested in non matching records
.groupBy("match")
.agg(count("pattern").as("count"))
result.show()
+---------+-----+
| key|count|
+---------+-----+
|TYET-3423| 3|
| ABX-3354| 2|
+---------+-----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With