I want to use lambda function in order to compute the average by key of a (JavaPairRDD<Integer, Double> pairs
). For that reason, I developed the following code:
java.util.function.Function<Double, Tuple2<Double, Integer>> createAcc = x -> new Tuple2<Double, Integer>(x, 1);
BiFunction<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>> addAndCount = (Tuple2<Double, Integer> x, Double y) -> { return new Tuple2(x._1()+y, x._2()+1 ); };
BiFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>> combine = (Tuple2<Double, Integer> x, Tuple2<Double, Integer> y) -> { return new Tuple2(x._1()+y._1(), x._2()+y._2() ); };
JavaPairRDD<Integer, Tuple2<Double, Integer>> avgCounts = pairs.combineByKey(createAcc, addAndCount, combine);
However, eclipse diplays this error:
The method combineByKey(Function<Double,C>, Function2<C,Double,C>, Function2<C,C,C>) in the type JavaPairRDD<Integer,Double> is not applicable for the arguments (Function<Double,Tuple2<Double,Integer>>,
BiFunction<Tuple2<Double,Integer>,Double,Tuple2<Double,Integer>>, BiFunction<Tuple2<Double,Integer>,Tuple2<Double,Integer>,Tuple2<Double,Integer>>)
The combineByKey method expects org.apache.spark.api.java.function.Function2
instead of java.util.function.BiFunction
. So either you write:
java.util.function.Function<Double, Tuple2<Double, Integer>> createAcc =
x -> new Tuple2<Double, Integer>(x, 1);
Function2<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>> addAndCount =
(Tuple2<Double, Integer> x, Double y) -> { return new Tuple2(x._1()+y, x._2()+1 ); };
Function2<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>> combine =
(Tuple2<Double, Integer> x, Tuple2<Double, Integer> y) -> { return new Tuple2(x._1()+y._1(), x._2()+y._2() ); };
JavaPairRDD<Integer, Tuple2<Double, Integer>> avgCounts =
pairs.combineByKey(createAcc, addAndCount, combine);
Explanation is in inline comments
List<List<Integer>> intList = new ArrayList<>();
intList.add(Arrays.asList(1,2));
intList.add(Arrays.asList(5,6));
intList.add(Arrays.asList(3,8));
intList.add(Arrays.asList(5,7));
intList.add(Arrays.asList(3,4));
System.out.println(intList); //[[1, 2], [5, 6], [3, 8], [5, 7], [3, 4]]
JavaRDD<List<Integer>> intRdd = jsc.parallelize(intList);
JavaPairRDD<Integer, Tuple2<Integer,Integer>> key_ValueSumsAndEncounters = intPairRdd.combineByKey(
/* Lambda 1 argument: a combiner for newly encountered key
- When key k is first time encountered, combiner returns
tuple with content: (value associated k, 1).
- Second element in the tuple is 1, since we have encountered first time.
- For example, when we encounter (k,v) = (1,4), combiner will return (1,(4,1))
*/
v -> new Tuple2<Integer, Integer>(v, 1)
/* Lambda 2 argument: a combiner for combining value for subsequent encounters (2nd and afterwards) of key
- When key k is encountered for 2nd (or more) time, combiner returns
tuple with content: (earlier addition result from combiner + value associated with k
, earlier number of encounters from combiner + 1)
- For example, when we encounter (1,6) and we have earlier combiner result (1,(4,1)), this combiner
will return (1,(4+6,1+1)) = (1,(10,2))
*/
, (c1, v) -> new Tuple2<Integer,Integer>(c1._1 + v, c1._2 + 1)
/* Lambda 3 argument: a combiner for combining two combiners across different partitions
- Combiner returns tuple with content:
(addition result from combiner1 + addition result from combiner2
, number of encounters from combiner1 + number of encounters from combiner2)
- For example, if we have
combiner1 from partition1 = (1,(10,2))
combiner2 from partition2 = (1,(15,4))
then this combiner will return (1,(10+15,2+4)) = (1,(25,6))
*/
, (c1, c2) -> new Tuple2<Integer,Integer>(c1._1 + c2._1, c1._2 + c2._2)
);
System.out.println(key_ValueSumsAndEncounters.collect()); //[(1,(2,1)), (3,(12,2)), (5,(13,2))]
//kse for (key,(sum,encounters))
JavaRDD<Tuple2<Integer, Double>> key_avg = key_ValueSumsAndEncounters.map(kse -> new Tuple2<Integer, Double>(kse._1, ((double)kse._2._1/kse._2._2)));
System.out.println(key_avg.collect()); //[(1,2.0), (3,6.0), (5,6.5)]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With