I'm using spark with scala and I've a RDD full of tuple2 containing a complex object as key and a double. The aim is to add the double (the frequency) if the object are identical.
for that I've defined my object as follow :
case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Ordered[SimpleCoocurrence]{
def compare(that: SimpleCoocurrence) = {
if(this.word.equals(that.word)&&this.word_pos.equals(that.word_pos)
&&this.cooc.equals(that.cooc)&&this.cooc_pos.equals(that.cooc_pos))
0
else
this.toString.compareTo(that.toString)
}
}
now I'm trying to use reduceBykey like that :
val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
coocRDD.map(tup=>tup).reduceByKey(_+_)
println(coocRDD.count)
But, the result shows that the RDD before and after processing a reducebykey contains exactly the same number of elements.
How can I perform a reduceByKey using tuple2[SimpleCoocurrence,Double] ? Is implementing Ordered trait the good way to tell Spark how to compare my objects ? Should I use only tuple2[String,Double] ?
thx,
reduceByKey
does not use Ordering but hashCode
and equals
to determine what keys are the same. In particular, the hashPartitioner
will group keys by hash, sothat keys with the same hashCode fall on the same partition sothat further reduction can happen on a per-partition.
case classes have a default implementation of equals
and hashCode
. Probably the test data used has different values of the field distance:Double
making each instance an unique object. Using it as key will result in only identical objects being reduced as one.
One way to address this would be to define a key for your case class
and an addition method for the object, something like this:
case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Serializable {
val key = word + word_pos + cooc + cooc_pos
}
object SimpleCoocurrence {
val add: (SimpleCoocurrence, SimpleCoocurrence) => SimpleCoocurrence = ???
}
val coocList:List[SimpleCoocurrence] = ???
val coocRDD = sc.parallelize(coocList)
val coocByKey = coocRDD.keyBy(_.key)
val addedCooc = coocByKey.reduceByKey(SimpleCoocurrence.add)
(*) code provided as guiding example - not compiled or tested.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With