This is my text file that is an input to the program:
Id Title Copy
B2002010 gyh 1
D2001001 abc 12
M2003005 zxc 3
D2002003 qwe 13
M2001002 efg 1
D2001004 asd 6
D2003005 zxc 3
M2001006 wer 6
D2001006 wer 6
B2004008 sxc 10
D2002007 sdf 9
D2004008 sxc 10
ID is formatted as Xyyyyrrr where:
X is B => Book or M => Magazineyyyy is the yearrrr is random number.What I have to do is: Obtain the total number of copies for books or magazines which are from the same year. Plus, a small data cleansing for the column "copy", if I find something other than a number I will replace it with a '0'.
My Spark project is on Eclipse and I am using Maven and Scala IDE I need to use a MapReduce Function.
I have started my Map function that splits the text file.
This is the code I started:
package bd.spark_app
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import scala.io.Source
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
import org.apache.log4j._
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.Row
import scala.Array
object alla {
def main(args:Array[String]) = {
val conf = new SparkConf().setMaster("local").setAppName("trying")
val sc = new SparkContext(conf)
val x = sc.textFile("/home/hadoopusr/sampledata")
x.map(_.split(" ")).foreach(r =>
println(r(0).dropRight(3), r(2))
)
sc.stop()
}
}
This is my result for the Map function I have showed above
(B2002,1)
(D2001,12)
(M2003,3)
(D2002,13)
(M2001,1)
(D2001,6)
(D2003,3)
(M2001,6)
(D2001,6)
(B2004,10)
(D2002,9)
(D2004,10)
(M2004,11)
(D2004,11)
I just need some sort of reduce function that will grab all the books and magazines from the same year and add the number of copies together and check that the column "copy" are Numbers
Example: with records (B2002,12) and (B2002,16) the result should be (B2002,28).
Method "reduceByKey" can be used:
val converted = x.map(_.split(" ")).map(r => (r(0).dropRight(3), r(2).toInt))
val result = converted.reduceByKey(_ + _)
Output:
(M2001,7)
(D2001,24)
(M2003,3)
(D2003,3)
(D2002,22)
(D2004,10)
(B2002,1)
(B2004,10)
Note: looks like input file has "csv" format, and better use "spark.read.csv" for read data, and work with DataFrame instead of RDD.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With