Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

reduceByKey using Scala object as key

I'm using spark with scala and I've a RDD full of tuple2 containing a complex object as key and a double. The aim is to add the double (the frequency) if the object are identical.

for that I've defined my object as follow :

    case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Ordered[SimpleCoocurrence]{
      def compare(that: SimpleCoocurrence) = {
        if(this.word.equals(that.word)&&this.word_pos.equals(that.word_pos)
           &&this.cooc.equals(that.cooc)&&this.cooc_pos.equals(that.cooc_pos))
          0
        else
          this.toString.compareTo(that.toString)
      }
    }

now I'm trying to use reduceBykey like that :

val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
coocRDD.map(tup=>tup).reduceByKey(_+_)
println(coocRDD.count)

But, the result shows that the RDD before and after processing a reducebykey contains exactly the same number of elements.

How can I perform a reduceByKey using tuple2[SimpleCoocurrence,Double] ? Is implementing Ordered trait the good way to tell Spark how to compare my objects ? Should I use only tuple2[String,Double] ?

thx,

like image 857
chrisbtk Avatar asked Jan 27 '15 20:01

chrisbtk


1 Answers

reduceByKey does not use Ordering but hashCode and equals to determine what keys are the same. In particular, the hashPartitioner will group keys by hash, sothat keys with the same hashCode fall on the same partition sothat further reduction can happen on a per-partition.

case classes have a default implementation of equals and hashCode. Probably the test data used has different values of the field distance:Double making each instance an unique object. Using it as key will result in only identical objects being reduced as one.

One way to address this would be to define a key for your case class and an addition method for the object, something like this:

case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Serializable {
   val key = word + word_pos + cooc + cooc_pos
}
object SimpleCoocurrence {
   val add: (SimpleCoocurrence, SimpleCoocurrence) => SimpleCoocurrence = ???
}

val coocList:List[SimpleCoocurrence] = ???
val coocRDD = sc.parallelize(coocList)
val coocByKey = coocRDD.keyBy(_.key)
val addedCooc = coocByKey.reduceByKey(SimpleCoocurrence.add)

(*) code provided as guiding example - not compiled or tested.

like image 56
maasg Avatar answered Oct 31 '22 01:10

maasg