Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Combinebykey JAVA lambda expression

I want to use lambda function in order to compute the average by key of a (JavaPairRDD<Integer, Double> pairs). For that reason, I developed the following code:

java.util.function.Function<Double, Tuple2<Double, Integer>> createAcc = x -> new Tuple2<Double, Integer>(x, 1);

BiFunction<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>>  addAndCount = (Tuple2<Double, Integer> x, Double y) -> {  return new Tuple2(x._1()+y, x._2()+1 );   };

BiFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>  combine = (Tuple2<Double, Integer> x, Tuple2<Double, Integer> y) -> {  return new Tuple2(x._1()+y._1(), x._2()+y._2() );   };

JavaPairRDD<Integer, Tuple2<Double, Integer>> avgCounts = pairs.combineByKey(createAcc, addAndCount, combine);

However, eclipse diplays this error:

The method combineByKey(Function<Double,C>, Function2<C,Double,C>, Function2<C,C,C>) in the type JavaPairRDD<Integer,Double> is not applicable for the arguments (Function<Double,Tuple2<Double,Integer>>,
 BiFunction<Tuple2<Double,Integer>,Double,Tuple2<Double,Integer>>, BiFunction<Tuple2<Double,Integer>,Tuple2<Double,Integer>,Tuple2<Double,Integer>>) 
like image 681
Wassim Avatar asked Mar 02 '15 09:03

Wassim


2 Answers

The combineByKey method expects org.apache.spark.api.java.function.Function2 instead of java.util.function.BiFunction. So either you write:

java.util.function.Function<Double, Tuple2<Double, Integer>> createAcc =
    x -> new Tuple2<Double, Integer>(x, 1);

Function2<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>>  addAndCount = 
    (Tuple2<Double, Integer> x, Double y) -> {  return new Tuple2(x._1()+y, x._2()+1 );   };

Function2<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>  combine = 
    (Tuple2<Double, Integer> x, Tuple2<Double, Integer> y) -> {  return new Tuple2(x._1()+y._1(), x._2()+y._2() );   };

JavaPairRDD<Integer, Tuple2<Double, Integer>> avgCounts = 
    pairs.combineByKey(createAcc, addAndCount, combine);
like image 82
G Quintana Avatar answered Oct 05 '22 13:10

G Quintana


Explanation is in inline comments

List<List<Integer>> intList = new ArrayList<>(); 
intList.add(Arrays.asList(1,2));
intList.add(Arrays.asList(5,6));
intList.add(Arrays.asList(3,8));
intList.add(Arrays.asList(5,7));
intList.add(Arrays.asList(3,4));
System.out.println(intList);  //[[1, 2], [5, 6], [3, 8], [5, 7], [3, 4]]

JavaRDD<List<Integer>> intRdd = jsc.parallelize(intList);

JavaPairRDD<Integer, Tuple2<Integer,Integer>> key_ValueSumsAndEncounters = intPairRdd.combineByKey(
            /* Lambda 1 argument: a combiner for newly encountered key
               - When key k is first time encountered, combiner returns 
                 tuple with content: (value associated k, 1).
               - Second element in the tuple is 1, since we have encountered first time.
               - For example, when we encounter (k,v) = (1,4), combiner will return (1,(4,1))
            */ 
            v -> new Tuple2<Integer, Integer>(v, 1)
            /* Lambda 2 argument: a combiner for combining value for subsequent encounters (2nd and afterwards) of key
               - When key k is encountered for 2nd (or more) time, combiner returns
                 tuple with content: (earlier addition result from combiner + value associated with k 
                                 , earlier number of encounters from combiner + 1)
               - For example, when we encounter (1,6) and we have earlier combiner result (1,(4,1)), this combiner 
                 will return (1,(4+6,1+1)) = (1,(10,2))
            */
            , (c1, v) -> new Tuple2<Integer,Integer>(c1._1 + v, c1._2 + 1)
            /* Lambda 3 argument: a combiner for combining two combiners across different partitions 
               - Combiner returns tuple with content:
                 (addition result from combiner1 + addition result from combiner2
                 , number of encounters from combiner1 + number of encounters from combiner2)
               - For example, if we have 
                 combiner1 from partition1 = (1,(10,2))
                 combiner2 from partition2 = (1,(15,4))
                 then this combiner will return (1,(10+15,2+4)) = (1,(25,6))
            */
            , (c1, c2) -> new Tuple2<Integer,Integer>(c1._1 + c2._1, c1._2 + c2._2)
           );

System.out.println(key_ValueSumsAndEncounters.collect()); //[(1,(2,1)), (3,(12,2)), (5,(13,2))]

//kse for (key,(sum,encounters))
JavaRDD<Tuple2<Integer, Double>> key_avg = key_ValueSumsAndEncounters.map(kse -> new Tuple2<Integer, Double>(kse._1, ((double)kse._2._1/kse._2._2)));
System.out.println(key_avg.collect()); //[(1,2.0), (3,6.0), (5,6.5)]
like image 40
Mahesha999 Avatar answered Oct 05 '22 13:10

Mahesha999