Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Partial/Full-match value in one RDD to values in another RDD

I have two RDDs where the first RDD has records of the form

RDD1 = (1, 2017-2-13,"ABX-3354 gsfette"
        2, 2017-3-18,"TYET-3423 asdsad"
        3, 2017-2-09,"TYET-3423 rewriu"
        4, 2017-2-13,"ABX-3354 42324"
        5, 2017-4-01,"TYET-3423 aerr")

and the second RDD has records of the form

RDD2 = ('mfr1',"ABX-3354")
       ('mfr2',"TYET-3423")

I need to find all the records in RDD1 which have a full match/partial match for each value in RDD2 matching the 3rd Column of RDD1 to 2nd column of RDD2 and get the count

For this example, the end result would be:

ABX-3354  2
TYET-3423 3

What is the best way to do this?

like image 614
Gayatri Avatar asked Mar 08 '23 05:03

Gayatri


1 Answers

I am posting couple of solutions with Spark SQL and more focused towards accurate pattern matching of search string in given text.

1: Using CrossJoin

import spark.implicits._

val df1 = Seq(
  (1, "2017-2-13", "ABX-3354 gsfette"),
  (2, "2017-3-18", "TYET-3423 asdsad"),
  (3, "2017-2-09", "TYET-3423 rewriu"),
  (4, "2017-2-13", "ABX-335442324"), //changed from "ABX-3354 42324"
  (5, "2017-4-01", "aerrTYET-3423") //changed from "TYET-3423 aerr"
).toDF("id", "dt", "txt")

val df2 = Seq(
  ("mfr1", "ABX-3354"),
  ("mfr2", "TYET-3423")
).toDF("col1", "key")

//match function for filter
def matcher(row: Row): Boolean = row.getAs[String]("txt")
  .contains(row.getAs[String]("key"))

val join = df1.crossJoin(df2)

import org.apache.spark.sql.functions.count

val result = join.filter(matcher _)
  .groupBy("key")
  .agg(count("txt").as("count"))

2: Using Broadcast variable

import spark.implicits._

val df1 = Seq(
  (1, "2017-2-13", "ABX-3354 gsfette"),
  (2, "2017-3-18", "TYET-3423 asdsad"),
  (3, "2017-2-09", "TYET-3423 rewriu"),
  (4, "2017-2-13", "ABX-3354 42324"),
  (5, "2017-4-01", "aerrTYET-3423"),
  (6, "2017-4-01", "aerrYET-3423")
).toDF("id", "dt", "pattern")

//small dataset to broadcast
val df2 = Seq(
  ("mfr1", "ABX-3354"),
  ("mfr2", "TYET-3423")
).map(_._2) // considering only 2 values in pair

//Lookup to use in UDF
val lookup = spark.sparkContext.broadcast(df2)

//Udf
import org.apache.spark.sql.functions._
val matcher = udf((txt: String) => {
  val matches: Seq[String] = lookup.value.filter(txt.contains(_))
  if (matches.size > 0) matches.head else null
})

val result = df1.withColumn("match", matcher($"pattern"))
  .filter($"match".isNotNull) // not interested in non matching records
  .groupBy("match")
  .agg(count("pattern").as("count"))

Both solutions result same output

result.show()

+---------+-----+
|      key|count|
+---------+-----+
|TYET-3423|    3|
| ABX-3354|    2|
+---------+-----+
like image 163
mrsrinivas Avatar answered Mar 11 '23 04:03

mrsrinivas