I'm trying to process messages from Kafka via Spark Streaming 1.3.1. The Job is running locally on my PC.
When I start the job and there are no messages to consume, it is working fine. But when I send a message to Kafka, the job logs a NullPointerException
15/05/20 11:22:00 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.NullPointerException
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:54)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:71)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:71)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/05/20 11:22:00 ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 9)
org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/05/20 11:22:00 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 9, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/05/20 11:22:00 ERROR TaskSetManager: Task 1 in stage 4.0 failed 1 times; aborting job
15/05/20 11:22:00 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
15/05/20 11:22:00 INFO TaskSchedulerImpl: Cancelling stage 4
15/05/20 11:22:00 INFO DAGScheduler: Stage 4 (foreachRDD at SimpleApp.java:55) failed in 0,018 s
15/05/20 11:22:00 INFO DAGScheduler: Job 4 failed: foreachRDD at SimpleApp.java:55, took 0,026982 s
15/05/20 11:22:00 ERROR JobScheduler: Error running job streaming job 1432113720000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 9, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Here is the code of the consumer job. The custom decoder 'TraceEventDecoder' simply deserializes the byte array in the Kafka message to an object of type TraceEvent.
public static void main(final String[] args) throws InterruptedException, IOException {
System.out.println("Job started");
SparkConf conf = new SparkConf().setAppName("HandlingTimeFilter");
conf.setMaster("local[10]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "cer:2181");
kafkaParams.put("bootstrap.servers", "cer:9099,lanthan:9099,uran:9099,protactinium:9099");
kafkaParams.put("consumer.id", "SimpleSparkConsumer");
kafkaParams.put("group.id", "consumers");
Set<String> topics = new HashSet<>();
topics.add("traceEvents");
JavaPairInputDStream<String, TraceEvent> stream =
KafkaUtils.createDirectStream(
jssc, String.class, TraceEvent.class,
StringDecoder.class, TraceEventDecoder.class,
kafkaParams, topics);
stream.foreachRDD((Function<JavaPairRDD<String, TraceEvent>, Void>) rdd -> {
Map<String, TraceEvent> events = rdd.collectAsMap();
System.out.println("New RDD call, size=" + events.size());
return null;
});
jssc.start();
Thread.sleep(60000);
jssc.stop();
}
I had this issue with Spark 1.2.0, but upgraded to 1.3.1 and it's working now. I think the issue might be related to missing dependencies though. If you're using Maven you can adjust your pom.xml to build a proper uber jar.
<project>
<groupId>com.databricks.apps.logs</groupId>
<artifactId>log-analyzer</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Databricks Spark Logs Analyzer</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>Akka repository</id>
<url>http://repo.akka.io/releases</url>
</repository>
</repositories>
<dependencies>
<dependency> <!-- Spark -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency> <!-- Spark SQL -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency> <!-- Spark Streaming -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency> <!-- Command Line Parsing -->
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<finalName>uber-${project.artifactId}-${project.version}</finalName>
</configuration>
</plugin>
</plugins>
</build>
source: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/missing_dependencies_in_jar_files.html (I updated the version above to use Spark 1.3.1 and to include the Kafka dependency)
Is your TraceEventDecoder constructor public, and is there a TraceEventDecoder(VerifiableProperties) version? This error can happen when something wrong is with parameters for your initialization of KafkaDirectStream. I've submitted an issue to Apache with this case: https://issues.apache.org/jira/browse/SPARK-9780
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With