Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark-Kafka.TaskCompletionListenerException & KafkaRDD$KafkaRDDIterator.close NPE on local cluster(Client Mode)

My spark-streaming code works seamlessly on eclipse IDE. But when i run it on a local spark cluster it gives the org.apache.spark.util.TaskCompletionListenerException.

Also on spark-submit "Client Mode" code runs fine, until i start my kafka producer but as i start producer it gives the following error.

I start the local cluster with command sh SPARK_HOME/sbin/start-all.sh

and call spark-submit with this script.

#!/bin/sh

SP_SUBMIT=/home/user/spark/bin/spark-submit
DEP_MODE=client


$SP_SUBMIT \
--deploy-mode $DEP_MODE \
--class com.alind.sparkStream.Test \
--master spark://clstr:7077 \
--name alind\
/home/user/jar/com.alind-0.0.1-SNAPSHOT.jar \

And i am getting this error when the spark stream starts receiving messages.

2015-06-29 16:13:56 ERROR JobScheduler:96 - Error running job streaming job 1435574590600 ms.3
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 306.0 failed 1 times, most recent failure: Lost task 0.0 in stage 306.0 (TID 164, localhost): org.apache.spark.util.TaskCompletionListenerException
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
    at org.apache.spark.scheduler.Task.run(Task.scala:58)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2015-06-29 16:13:56 WARN  JobProgressListener:71 - Task start for unknown stage 307
2015-06-29 16:13:56 WARN  JobProgressListener:71 - Task start for unknown stage 308
2015-06-29 16:13:56 WARN  JobProgressListener:71 - Task start for unknown stage 309
2015-06-29 16:13:56 INFO  SparkContext:59 - Starting job: foreach at Test.java:428
2015-06-29 16:13:56 INFO  MapOutputTrackerMaster:59 - Size of output statuses for shuffle 34 is 84 bytes
2015-06-29 16:13:56 INFO  MapOutputTrackerMaster:59 - Size of output statuses for shuffle 35 is 84 bytes
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Got job 94 (foreach at Test.java:428) with 2 output partitions (allowLocal=false)
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Final stage: Stage 327(foreach at Test.java:428)
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Parents of final stage: List(Stage 320, Stage 317, Stage 324, Stage 321, Stage 318, Stage 325, Stage 322, Stage 326, Stage 323, Stage 319)
2015-06-29 16:13:56 INFO  ShuffledDStream:59 - Slicing from 1435574619500 ms to 1435574620400 ms (aligned to 1435574619500 ms and 1435574620400 ms)
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Missing parents: List(Stage 320, Stage 317, Stage 318, Stage 319)
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Submitting Stage 317 (MappedRDD[234] at mapToPair at Test.java:157), which has no missing parents
2015-06-29 16:13:56 INFO  MemoryStore:59 - ensureFreeSpace(4024) called with curMem=386851, maxMem=278302556
2015-06-29 16:13:56 INFO  MemoryStore:59 - Block broadcast_129 stored as values in memory (estimated size 3.9 KB, free 265.0 MB)
2015-06-29 16:13:56 INFO  MemoryStore:59 - ensureFreeSpace(2230) called with curMem=390875, maxMem=278302556
2015-06-29 16:13:56 INFO  MemoryStore:59 - Block broadcast_129_piece0 stored as bytes in memory (estimated size 2.2 KB, free 265.0 MB)
2015-06-29 16:13:56 INFO  BlockManagerInfo:59 - Added broadcast_129_piece0 in memory on localhost:42836 (size: 2.2 KB, free: 265.3 MB)
2015-06-29 16:13:56 INFO  BlockManagerMaster:59 - Updated info of block broadcast_129_piece0
2015-06-29 16:13:56 INFO  SparkContext:59 - Created broadcast 129 from getCallSite at DStream.scala:294
2015-06-29 16:13:56 INFO  DAGScheduler:59 - Submitting 1 missing tasks from Stage 317 (MappedRDD[234] at mapToPair at Test.java:157)
2015-06-29 16:13:56 INFO  TaskSchedulerImpl:59 - Adding task set 317.0 with 1 tasks
2015-06-29 16:13:56 INFO  TaskSetManager:59 - Starting task 0.0 in stage 317.0 (TID 168, localhost, NODE_LOCAL, 7642 bytes)
2015-06-29 16:13:56 INFO  Executor:59 - Running task 0.0 in stage 317.0 (TID 168)
2015-06-29 16:13:56 INFO  KafkaRDD:103 - Computing topic test, partition 0 offsets 252661 -> 253192
2015-06-29 16:13:56 INFO  VerifiableProperties:68 - Verifying properties
2015-06-29 16:13:56 INFO  VerifiableProperties:68 - Property group.id is overridden to 
2015-06-29 16:13:56 INFO  VerifiableProperties:68 - Property zookeeper.connect is overridden to 
2015-06-29 16:13:56 ERROR TaskContextImpl:96 - Error in TaskCompletionListener
java.lang.NullPointerException
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
    at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
    at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:58)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
2015-06-29 16:13:56 ERROR Executor:96 - Exception in task 0.0 in stage 317.0 (TID 168)
org.apache.spark.util.TaskCompletionListenerException
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
    at org.apache.spark.scheduler.Task.run(Task.scala:58)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
2015-06-29 16:13:56 WARN  TaskSetManager:71 - Lost task 0.0 in stage 317.0 (TID 168, localhost): org.apache.spark.util.TaskCompletionListenerException
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
    at org.apache.spark.scheduler.Task.run(Task.scala:58)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

My Pom.xml looks like this.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>alinds</groupId>
    <artifactId>alind</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>
                                      com.alind.sparkStream.Test
                                    </mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>

        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.3.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.3.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.3.1</version>

        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>MyOtherProject</groupId>
            <version>1.0</version>

        </dependency>

    </dependencies>
    <repositories>
        <repository>
            <id>Spark repository</id>
            <url>http://www.sparkjava.com/nexus/content/repositories/spark/</url>
        </repository>
    </repositories>
</project>

And the Spark Driver looks like this...

public class Test {



 static Logger log = Logger.getLogger(Test.class.getName());



    public static void main(String[] args) {

        System.setProperty("spark.serializer",
                "org.apache.spark.serializer.KryoSerializer");


        SparkConf sparkConf = new SparkConf();

        sparkConf.setMaster("spark://clstr:7077");
       // when i run this code from eclipse i change setMaster value to ("local[2]")
        sparkConf.setAppName("alind");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(
                javaSparkContext, new Duration(100));

        Set<String> topics = new HashSet<String>();
        topics.add("test");

        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", "10.20.3.14:9092");
    // Tested this metadata.broker.list with localhost:9092 as well, its not working on cluster with any of these.
        JavaPairInputDStream<String, String> stream = KafkaUtils
                .createDirectStream(javaStreamingContext, String.class,
                        String.class, StringDecoder.class, StringDecoder.class,
                        kafkaParams, topics);
        stream.print();


        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();

    }
}

If you could give me any idea whats wrong with the local cluster, i will be grateful. Seems like something at the kafka end is wrong.

like image 628
Alind Billore Avatar asked Oct 31 '22 02:10

Alind Billore


1 Answers

I had the same problem and the cause was that one of my decoders had the incorrect constructor. The exception is really misleading in this regard.

Incorrect Class

class ReadingCreatedDecoder()
  extends Decoder[Message[ReadingCreated]]
  with ReadingCreatedAvroSupport

Correct version (see props: VerifiableProperties)

class ReadingCreatedDecoder(props: VerifiableProperties = null)
  extends Decoder[Message[ReadingCreated]]
  with ReadingCreatedAvroSupport

PS: I am using Scala and not Java.

like image 136
Saket Avatar answered Nov 11 '22 14:11

Saket