Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

java.lang.IllegalArgumentException: Invalid lambda deserialization

I'm trying to do a spark streaming job with Kafka but I have a problem when I execute my class using Eclipse

In my main class "JavaDirectKafkaWordCount.class" I created my JavaInputDStream with my kafka params and I'm trying to count the number of words readed from kafka topic

    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(ConsumerRecord::value);
    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
        .reduceByKey((i1, i2) -> i1 + i2);
lines.print();
    // Start the computation
    jssc.start();
    jssc.awaitTermination();
  }

I'm getting this error

17/11/13 00:20:33 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)java.io.IOException: unexpected exception type 
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1582)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1154)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)                                Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1148)    Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at start.JavaDirectKafkaWordCount.$deserializeLambda$(JavaDirectKafkaWordCount.java:1)
... 37 more

How can I solve this problem?

like image 797
Eliane PDC Avatar asked Nov 13 '17 00:11

Eliane PDC


1 Answers

Change this line:

JavaDStream<String> lines = messages.map(ConsumerRecord::value);

to

JavaDStream<String> lines = messages.map(x -> x.value());
like image 85
allanz Avatar answered Oct 09 '22 20:10

allanz