Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Data Manipulation in Spark

I am new to spark and I am trying to achieve a certain data manipulation based on counts - the problem is like this - I have a text file with information that looks like this -

john, apple 
john, apple
john, orange
jill, apple
jill, orange
jill, orange

what I want to do is simple - I want to count the number of times each fruit appears for each person and divide that number by the total number of fruits among the two persons - so the outcome would look like this:

john, apple, 2, 3
jill, apple, 1, 3
john, orange, 1, 3
jill orange, 2, 3

Then I can divide row 3 by row 4 for this final product -

john, apple, 2, 3, 2/3
jill, apple, 1, 3, 1/3
john, orange, 1, 3, 1/3
jill orange, 2, 3, 2/3

I have tried a few things in scala like this -

var persons = sc.textFile("path_to_directory").map(_.split(",")).map(x=>(x(0),x(1)))
persons.map{case(person, fruit)=>((person, fruit), 1)}.reduceByKey(_+_).collect

The output of this provides -

((jill,orange),2)
((jill,apple),1)
((john,orange),1)
((john,apple),2)

This seems like a good start but then I don't know how to proceed from here. Any help or hints would be greatly appreciated!

UPDATE:

I have a proposed solution for this problem -

var persons = sc.textFile("path_to_directory").map(_.split(",")).map(x=>(x(0),x(1)))

var count = persons.map{case(name, fruit)=>((name,fruit),1)}.reduceByKey(_+_)

var total = persons.map{case(name, fruit)=>(fruit,1)}.reduceByKey(_+_)

var fruit = count.map{case((name, fruit), count)=>(fruit, (name, count))}

fruit.join(total).map{case((fruit,((name, count), total)))=>(name, fruit, count, total, count.toDouble/total.toDouble)}.collect.foreach(println)

The output for this scala code in spark is -

(jill,orange,2,3,0.6666666666666666)
(john,orange,1,3,0.3333333333333333)
(jill,apple,1,3,0.3333333333333333)
(john,apple,2,3,0.6666666666666666)
like image 708
RDizzl3 Avatar asked Nov 16 '25 23:11

RDizzl3


1 Answers

One possible solution:

def getFreqs(x: String, vals: Iterable[String]) = {
    val counts = vals.groupBy(identity).mapValues(_.size)
    val sum = counts.values.sum.toDouble
    counts.map { case (k, v) => (x, k, v, sum.toInt, v / sum) }
}

persons.groupByKey.flatMap { case(k, v) => getFreqs(k, v) }

And another:

val fruitsPerPerson = sc.broadcast(persons.countByKey)

persons.groupBy(identity).map { case (k, v) => {
    val sum: Float = fruitsPerPerson.value.get(k._1) match {
        case Some(x) => x
        case _ => 1
    }
    (k._1, k._2, v.size, sum.toInt, v.size / sum) 
}}

Both groupByKey and groupBy can be rather inefficient so if you're looking for a more robust solution you may consider using combineByKey:

def create(value: String) = Map(value -> 1)

def mergeVals(x: Map[String, Int], value: String) = {
    val count = x.getOrElse(value, 0) + 1
    x ++ Map(value -> count)
}

def mergeCombs(x: Map[String, Int], y: Map[String, Int]) = {
    val keys = x.keys ++ y.keys
    keys.map((k: String) => (k -> (x.getOrElse(k, 0) + y.getOrElse(k, 0)))).toMap
}

val counts = persons.combineByKey(create, mergeVals, mergeCombs)

counts.flatMap { case (x: String, counts: Map[String, Int]) =>  {
    val sum = counts.values.sum.toDouble
    counts.map { case (k: String, v: Int) => (x, k, v, sum.toInt, v / sum) }
}}
like image 97
zero323 Avatar answered Nov 19 '25 12:11

zero323