I have a dataset and i want to extract those (review/text) which have (review/time) between x and y, for example ( 1183334400 < time < 1185926400),
here are part of my data:
product/productId: B000278ADA
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: A17KXW1PCUAIIN
review/profileName: Mark Anthony "Mark"
review/helpfulness: 4/4
review/score: 5.0
review/time: 1174435200
review/summary: Jobst UltraSheer Knee High Stockings
review/text: Does a very good job of relieving fatigue.
product/productId: B000278ADB
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: A9Q3932GX4FX8
review/profileName: Trina Wehle
review/helpfulness: 1/1
review/score: 3.0
review/time: 1352505600
review/summary: Delivery was very long wait.....
review/text: It took almost 3 weeks to recieve the two pairs of stockings .
product/productId: B000278ADB
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: AUIZ1GNBTG5OB
review/profileName: dgodoy
review/helpfulness: 1/1
review/score: 2.0
review/time: 1287014400
review/summary: sizes recomended in the size chart are not real
review/text: sizes are much smaller than what is recomended in the chart. I tried to put it and sheer it!.
my Spark-Scala Code :
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf, SparkContext}
object test1 {
def main(args: Array[String]): Unit = {
val conf1 = new SparkConf().setAppName("golabi1").setMaster("local")
val sc = new SparkContext(conf1)
val conf: Configuration = new Configuration
conf.set("textinputformat.record.delimiter", "product/title:")
val input1=sc.newAPIHadoopFile("data/Electronics.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
val lines = input1.map { text => text._2}
val filt = lines.filter(text=>(text.toString.contains(tt => tt in (startdate until enddate))))
filt.saveAsTextFile("data/filter1")
}
}
but my code does not work well,
how can i filter these lines?
In Spark, the Filter function returns a new dataset formed by selecting those elements of the source on which the function returns true. So, it retrieves only the elements that satisfy the given condition.
Resilient Distributed Dataset (RDD) is the fundamental data structure of Spark. They are immutable Distributed collections of objects of any type. As the name suggests is a Resilient (Fault-tolerant) records of data that resides on multiple nodes.
PySpark filter() function is used to filter the rows from RDD/DataFrame based on the given condition or SQL expression, you can also use where() clause instead of the filter() if you are coming from an SQL background, both these functions operate exactly the same.
RDD Operations. RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.
Is is much simpler than that. Try this:
object test1
{
def main(args: Array[String]): Unit =
{
val conf1 = new SparkConf().setAppName("golabi1").setMaster("local")
val sc = new SparkContext(conf1)
def extractDateAndCompare(line: String): Boolean=
{
val from = line.indexOf("/time: ") + 7
val to = line.indexOf("review/text: ") -1
val date = line.substring(from, to).toLong
date > startDate && date < endDate
}
sc.textFile("data/Electronics.txt")
.filter(extractDateAndCompare)
.saveAsTextFile("data/filter1")
}
}
I usually find those intermediate auxiliary methods to make things much clearer. Of course, this assumes the boundary dates are defined somewhere and that the input file contain format issues. I did this intentionally to keep this simple, but adding a try, returning an Option clause and using flatMap() can help you avoid errors if you have them.
Also, your raw text is a little cumbersome, you might want to explore Json, TSV files or some other alternative, easier format.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With