My spark-streaming code works seamlessly on eclipse IDE. But when i run it on a local spark cluster it gives the org.apache.spark.util.TaskCompletionListenerException.
Also on spark-submit "Client Mode" code runs fine, until i start my kafka producer but as i start producer it gives the following error.
I start the local cluster with command sh SPARK_HOME/sbin/start-all.sh
and call spark-submit with this script.
#!/bin/sh
SP_SUBMIT=/home/user/spark/bin/spark-submit
DEP_MODE=client
$SP_SUBMIT \
--deploy-mode $DEP_MODE \
--class com.alind.sparkStream.Test \
--master spark://clstr:7077 \
--name alind\
/home/user/jar/com.alind-0.0.1-SNAPSHOT.jar \
And i am getting this error when the spark stream starts receiving messages.
2015-06-29 16:13:56 ERROR JobScheduler:96 - Error running job streaming job 1435574590600 ms.3
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 306.0 failed 1 times, most recent failure: Lost task 0.0 in stage 306.0 (TID 164, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-06-29 16:13:56 WARN JobProgressListener:71 - Task start for unknown stage 307
2015-06-29 16:13:56 WARN JobProgressListener:71 - Task start for unknown stage 308
2015-06-29 16:13:56 WARN JobProgressListener:71 - Task start for unknown stage 309
2015-06-29 16:13:56 INFO SparkContext:59 - Starting job: foreach at Test.java:428
2015-06-29 16:13:56 INFO MapOutputTrackerMaster:59 - Size of output statuses for shuffle 34 is 84 bytes
2015-06-29 16:13:56 INFO MapOutputTrackerMaster:59 - Size of output statuses for shuffle 35 is 84 bytes
2015-06-29 16:13:56 INFO DAGScheduler:59 - Got job 94 (foreach at Test.java:428) with 2 output partitions (allowLocal=false)
2015-06-29 16:13:56 INFO DAGScheduler:59 - Final stage: Stage 327(foreach at Test.java:428)
2015-06-29 16:13:56 INFO DAGScheduler:59 - Parents of final stage: List(Stage 320, Stage 317, Stage 324, Stage 321, Stage 318, Stage 325, Stage 322, Stage 326, Stage 323, Stage 319)
2015-06-29 16:13:56 INFO ShuffledDStream:59 - Slicing from 1435574619500 ms to 1435574620400 ms (aligned to 1435574619500 ms and 1435574620400 ms)
2015-06-29 16:13:56 INFO DAGScheduler:59 - Missing parents: List(Stage 320, Stage 317, Stage 318, Stage 319)
2015-06-29 16:13:56 INFO DAGScheduler:59 - Submitting Stage 317 (MappedRDD[234] at mapToPair at Test.java:157), which has no missing parents
2015-06-29 16:13:56 INFO MemoryStore:59 - ensureFreeSpace(4024) called with curMem=386851, maxMem=278302556
2015-06-29 16:13:56 INFO MemoryStore:59 - Block broadcast_129 stored as values in memory (estimated size 3.9 KB, free 265.0 MB)
2015-06-29 16:13:56 INFO MemoryStore:59 - ensureFreeSpace(2230) called with curMem=390875, maxMem=278302556
2015-06-29 16:13:56 INFO MemoryStore:59 - Block broadcast_129_piece0 stored as bytes in memory (estimated size 2.2 KB, free 265.0 MB)
2015-06-29 16:13:56 INFO BlockManagerInfo:59 - Added broadcast_129_piece0 in memory on localhost:42836 (size: 2.2 KB, free: 265.3 MB)
2015-06-29 16:13:56 INFO BlockManagerMaster:59 - Updated info of block broadcast_129_piece0
2015-06-29 16:13:56 INFO SparkContext:59 - Created broadcast 129 from getCallSite at DStream.scala:294
2015-06-29 16:13:56 INFO DAGScheduler:59 - Submitting 1 missing tasks from Stage 317 (MappedRDD[234] at mapToPair at Test.java:157)
2015-06-29 16:13:56 INFO TaskSchedulerImpl:59 - Adding task set 317.0 with 1 tasks
2015-06-29 16:13:56 INFO TaskSetManager:59 - Starting task 0.0 in stage 317.0 (TID 168, localhost, NODE_LOCAL, 7642 bytes)
2015-06-29 16:13:56 INFO Executor:59 - Running task 0.0 in stage 317.0 (TID 168)
2015-06-29 16:13:56 INFO KafkaRDD:103 - Computing topic test, partition 0 offsets 252661 -> 253192
2015-06-29 16:13:56 INFO VerifiableProperties:68 - Verifying properties
2015-06-29 16:13:56 INFO VerifiableProperties:68 - Property group.id is overridden to
2015-06-29 16:13:56 INFO VerifiableProperties:68 - Property zookeeper.connect is overridden to
2015-06-29 16:13:56 ERROR TaskContextImpl:96 - Error in TaskCompletionListener
java.lang.NullPointerException
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-06-29 16:13:56 ERROR Executor:96 - Exception in task 0.0 in stage 317.0 (TID 168)
org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-06-29 16:13:56 WARN TaskSetManager:71 - Lost task 0.0 in stage 317.0 (TID 168, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
My Pom.xml looks like this.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>alinds</groupId>
<artifactId>alind</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>
com.alind.sparkStream.Test
</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>MyOtherProject</groupId>
<version>1.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>Spark repository</id>
<url>http://www.sparkjava.com/nexus/content/repositories/spark/</url>
</repository>
</repositories>
</project>
And the Spark Driver looks like this...
public class Test {
static Logger log = Logger.getLogger(Test.class.getName());
public static void main(String[] args) {
System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("spark://clstr:7077");
// when i run this code from eclipse i change setMaster value to ("local[2]")
sparkConf.setAppName("alind");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(
javaSparkContext, new Duration(100));
Set<String> topics = new HashSet<String>();
topics.add("test");
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "10.20.3.14:9092");
// Tested this metadata.broker.list with localhost:9092 as well, its not working on cluster with any of these.
JavaPairInputDStream<String, String> stream = KafkaUtils
.createDirectStream(javaStreamingContext, String.class,
String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics);
stream.print();
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
}
If you could give me any idea whats wrong with the local cluster, i will be grateful. Seems like something at the kafka end is wrong.
I had the same problem and the cause was that one of my decoders had the incorrect constructor. The exception is really misleading in this regard.
Incorrect Class
class ReadingCreatedDecoder()
extends Decoder[Message[ReadingCreated]]
with ReadingCreatedAvroSupport
Correct version (see props: VerifiableProperties)
class ReadingCreatedDecoder(props: VerifiableProperties = null)
extends Decoder[Message[ReadingCreated]]
with ReadingCreatedAvroSupport
PS: I am using Scala and not Java.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With