I have a JavaPairDStream containing key-value pair. I need to convert it into a HashMap.I have tried doing the same with a normal JavaPairRDD by calling "collectAsMap()" function on it and its working but when I am trying to do the same on DStream, it fails.
I am trying to achieve the same by converting "JavaPairDStream" into "JavaPairRDD" by using "foreachRDD" function and then after that I am using "collectAsMap()" function on the JavaPairRDD.
Map<String,String> value= new HashMap<String,String>();
value=line.collectAsMap();
//Here "line" is a "JavaPairRDD<String,String>".
It does not give any compilation error but when I run the program then it fails and throws the error as below.
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lscala.Tuple2;
at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:447)
at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:464)
at attempt1.CSV_Spark$3.call(CSV_Spark.java:109)
at attempt1.CSV_Spark$3.call(CSV_Spark.java:1)
I am not sure if my method is correct or not. Is there any difference between a normal "JavaPairRDD" and the one created by "foreachRDD" function? Why the same method works on a normal "JavaPairRDD" but fails when I am applying it on "JavaPairRDD" created by applying "foreachRDD" function on JavaPairDStream. If I am going wrong anywhere then kindly let me know. Also if there is any other way then please post it here. Thanks.
At compile time, down casting is accepted as both Map and HashMap are in same inheritance. Although we dont get any compile time errors, we will get ClassCastException at run time. To avoid this problem, you could try this:
Code:
JavaPairRDD<K, V> javaRDDPair = rddInstance.mapToPair(new PairFunction<T, K, V>() {
@Override
public Tuple2<K, V> call(final T value) {
// statements
// operations on value
return new Tuple2<K, V>(KTypeValue, VTypeValue);
}
});
Map<K,V> map = javaRDDPair.collectAsMap();
HashMap<K,V> hmap = new HashMap<K,V>(map);
Note: rddInstance is an object of JavaRDD type .
Let's say we have a JavaRDD which holds T type values in it. upon transform on it, we create JavaPairRDD which holds <K,V> pairs . Now the requirement is that convert JavaPairRDD to HashMap object for further computations in your application. Use collectAsMap method and assign its result to Map object itself. After that, you can create HashMap by passing Map instance .
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With