It's very sad.My spark version is 2.1.1,Scala version is 2.11
import org.apache.spark.SparkContext._
import com.mufu.wcsa.component.dimension.{DimensionKey, KeyTrait}
import com.mufu.wcsa.log.LogRecord
import org.apache.spark.rdd.RDD
object PV {
//
def stat[C <: LogRecord,K <:DimensionKey](statTrait: KeyTrait[C ,K],logRecords: RDD[C]): RDD[(K,Int)] = {
val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y)
I got this error
at 1502387780429
[ERROR] /Users/lemanli/work/project/newcma/wcsa/wcsa_my/wcsavistor/src/main/scala/com/mufu/wcsa/component/stat/PV.scala:25: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[(K, Int)]
[ERROR] val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y)
there is defined a trait
trait KeyTrait[C <: LogRecord,K <: DimensionKey]{
def getKey(c:C):K
}
It is compiled,Thanks.
def stat[C <: LogRecord,K <:DimensionKey : ClassTag : Ordering](statTrait: KeyTrait[C ,K],logRecords: RDD[C]): RDD[(K,Int)] = {
val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y)
Key need to override Ordering[T].
object ClientStat extends KeyTrait[DetailLogRecord, ClientStat] {
implicit val c
lientStatSorting = new Ordering[ClientStat] {
override def compare(x: ClientStat, y: ClientStat): Int = x.key.compare(y.key)
}
def getKey(detailLogRecord: DetailLogRecord): ClientStat = new ClientStat(detailLogRecord)
}
In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.
Both reduceByKey and groupByKey result in wide transformations which means both triggers a shuffle operation. The key difference between reduceByKey and groupByKey is that reduceByKey does a map side combine and groupByKey does not do a map side combine.
Basically, reduce must pull the entire dataset down into a single location because it is reducing to one final value. reduceByKey on the other hand is one value for each key. And since this action can be run on each machine locally first then it can remain an RDD and have further transformations done on its dataset.
On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).
This comes from using a pair rdd function generically. The reduceByKey
method is actually a method of the PairRDDFunctions
class, which has an implicit conversion from RDD
:
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V]
So it requires several implicit typeclasses. Normally when working with simple concrete types, those are already in scope. But you should be able to amend your method to also require those same implicits:
def stat[C <: LogRecord,K <:DimensionKey](statTrait: KeyTrait[C ,K],logRecords: RDD[C])(implicit kt: ClassTag[K], ord: Ordering[K])
Or using the newer syntax:
def stat[C <: LogRecord,K <:DimensionKey : ClassTag : Ordering](statTrait: KeyTrait[C ,K],logRecords: RDD[C])
reduceByKey
is a method that is only defined on RDDs of tuples, i.e. RDD[(K, V)]
(K, V is just a convention to say that first is key second is value).
Not sure from the example about what you are trying to achieve, but for sure you need to convert the values inside the RDD to tuples of two values.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With