Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RDD filter in scala spark

I have a dataset and i want to extract those (review/text) which have (review/time) between x and y, for example ( 1183334400 < time < 1185926400),

here are part of my data:

product/productId: B000278ADA
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: A17KXW1PCUAIIN
review/profileName: Mark Anthony "Mark"
review/helpfulness: 4/4
review/score: 5.0
review/time: 1174435200
review/summary: Jobst UltraSheer Knee High Stockings
review/text: Does a very good job of relieving fatigue.

product/productId: B000278ADB
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: A9Q3932GX4FX8
review/profileName: Trina Wehle
review/helpfulness: 1/1
review/score: 3.0
review/time: 1352505600
review/summary: Delivery was very long wait.....
review/text: It took almost 3 weeks to recieve the two pairs of stockings .

product/productId: B000278ADB
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: AUIZ1GNBTG5OB
review/profileName: dgodoy
review/helpfulness: 1/1
review/score: 2.0
review/time: 1287014400
review/summary: sizes recomended in the size chart are not real
review/text: sizes are much smaller than what is recomended in the chart. I tried to put it and sheer it!.

my Spark-Scala Code :

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}

object test1 {
  def main(args: Array[String]): Unit = {
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local")
    val sc = new SparkContext(conf1)
    val conf: Configuration = new Configuration
    conf.set("textinputformat.record.delimiter", "product/title:")
    val input1=sc.newAPIHadoopFile("data/Electronics.txt",     classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
    val lines = input1.map { text => text._2}
    val filt = lines.filter(text=>(text.toString.contains(tt => tt in (startdate until enddate))))
    filt.saveAsTextFile("data/filter1")
  }
}

but my code does not work well,

how can i filter these lines?

like image 943
Esmaeil zahedi Avatar asked Apr 20 '15 14:04

Esmaeil zahedi


People also ask

What is the function of filter () in Spark?

In Spark, the Filter function returns a new dataset formed by selecting those elements of the source on which the function returns true. So, it retrieves only the elements that satisfy the given condition.

What is RDD in Spark Scala?

Resilient Distributed Dataset (RDD) is the fundamental data structure of Spark. They are immutable Distributed collections of objects of any type. As the name suggests is a Resilient (Fault-tolerant) records of data that resides on multiple nodes.

How do I filter rows in PySpark RDD?

PySpark filter() function is used to filter the rows from RDD/DataFrame based on the given condition or SQL expression, you can also use where() clause instead of the filter() if you are coming from an SQL background, both these functions operate exactly the same.

What are the two types of RDD operations?

RDD Operations. RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.


1 Answers

Is is much simpler than that. Try this:

object test1 
{
  def main(args: Array[String]): Unit = 
  {
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local")
    val sc = new SparkContext(conf1)

    def extractDateAndCompare(line: String): Boolean=
    {
        val from = line.indexOf("/time: ") + 7
        val to = line.indexOf("review/text: ") -1
        val date = line.substring(from, to).toLong
        date > startDate && date < endDate
    }

    sc.textFile("data/Electronics.txt")
        .filter(extractDateAndCompare)
        .saveAsTextFile("data/filter1")
  }
}

I usually find those intermediate auxiliary methods to make things much clearer. Of course, this assumes the boundary dates are defined somewhere and that the input file contain format issues. I did this intentionally to keep this simple, but adding a try, returning an Option clause and using flatMap() can help you avoid errors if you have them.

Also, your raw text is a little cumbersome, you might want to explore Json, TSV files or some other alternative, easier format.

like image 192
Daniel Langdon Avatar answered Oct 30 '22 16:10

Daniel Langdon