Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pattern matching - spark scala RDD

I am new to Spark and Scala coming from R background.After a few transformations of RDD, I get a RDD of type

Description: RDD[(String, Int)]

Now I want to apply a Regular expression on the String RDD and extract substrings from the String and add just substring in a new coloumn.

Input Data :

BMW 1er Model,278
MINI Cooper Model,248

Output I am looking for :

   Input                  |  Brand   | Series      
BMW 1er Model,278,          BMW ,        1er        
MINI Cooper Model ,248      MINI ,      Cooper

where Brand and Series are newly calculated substrings from String RDD

What I have done so far.

I could achieve this for a String using regular expression, but I cani apply fro all lines.

 val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r //to look for BMW or MINI

Then I can use

brandRegEx.findFirstIn("hello this mini is bmW testing")

But how can I use it for all the lines of RDD and to apply different regular expression to achieve the output as above.

I read about this code snippet, but not sure how to put it altogether.

val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r

def getBrand(Col4: String) : String = Col4 match {
    case brandRegEx(str)  =>  
    case _ => ""
    return 'substring
}

Any help would be appreciated !

Thanks

like image 394
user3560220 Avatar asked Feb 09 '23 10:02

user3560220


1 Answers

To apply your regex to each item in the RDD, you should use the RDD map function, which transforms each row in the RDD using some function (in this case, a Partial Function in order to extract to two parts of the tuple which makes up each row):

import org.apache.spark.{SparkContext, SparkConf}

object Example extends App {

  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Example"))

  val data = Seq(
    ("BMW 1er Model",278),
    ("MINI Cooper Model",248))

  val dataRDD = sc.parallelize(data)

  val processedRDD = dataRDD.map{
    case (inString, inInt) =>
      val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r
      val brand = brandRegEx.findFirstIn(inString)
      //val seriesRegEx = ...
      //val series = seriesRegEx.findFirstIn(inString)
      val series = "foo"
      (inString, inInt, brand, series)
  }

  processedRDD.collect().foreach(println)
  sc.stop()
}

Note that I think you have some problems in your regular expression, and you also need a regular expression for finding the series. This code outputs:

(BMW 1er Model,278,BMW,foo)
(MINI Cooper Model,248,NOT FOUND,foo)

But if you correct your regexes for your needs, this is how you can apply them to each row.

like image 146
mattinbits Avatar answered Feb 17 '23 14:02

mattinbits