Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming Kafka - Job always quits when RDD contains an actual message

I'm trying to process messages from Kafka via Spark Streaming 1.3.1. The Job is running locally on my PC.

When I start the job and there are no messages to consume, it is working fine. But when I send a message to Kafka, the job logs a NullPointerException

15/05/20 11:22:00 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.NullPointerException
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:54)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:71)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:71)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/05/20 11:22:00 ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 9)
org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/05/20 11:22:00 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 9, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/05/20 11:22:00 ERROR TaskSetManager: Task 1 in stage 4.0 failed 1 times; aborting job
15/05/20 11:22:00 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
15/05/20 11:22:00 INFO TaskSchedulerImpl: Cancelling stage 4
15/05/20 11:22:00 INFO DAGScheduler: Stage 4 (foreachRDD at SimpleApp.java:55) failed in 0,018 s
15/05/20 11:22:00 INFO DAGScheduler: Job 4 failed: foreachRDD at SimpleApp.java:55, took 0,026982 s
15/05/20 11:22:00 ERROR JobScheduler: Error running job streaming job 1432113720000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 9, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Here is the code of the consumer job. The custom decoder 'TraceEventDecoder' simply deserializes the byte array in the Kafka message to an object of type TraceEvent.

 public static void main(final String[] args) throws InterruptedException, IOException {
    System.out.println("Job started");
    SparkConf conf = new SparkConf().setAppName("HandlingTimeFilter");
    conf.setMaster("local[10]");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("zookeeper.connect", "cer:2181");
    kafkaParams.put("bootstrap.servers", "cer:9099,lanthan:9099,uran:9099,protactinium:9099");
    kafkaParams.put("consumer.id", "SimpleSparkConsumer");
    kafkaParams.put("group.id", "consumers");

    Set<String> topics = new HashSet<>();
    topics.add("traceEvents");


    JavaPairInputDStream<String, TraceEvent> stream = 
                               KafkaUtils.createDirectStream(
                               jssc, String.class, TraceEvent.class,
                               StringDecoder.class, TraceEventDecoder.class,
                               kafkaParams, topics);

    stream.foreachRDD((Function<JavaPairRDD<String, TraceEvent>, Void>) rdd -> {
        Map<String, TraceEvent> events = rdd.collectAsMap();
        System.out.println("New RDD call, size=" + events.size());


        return null;
    });

    jssc.start();

    Thread.sleep(60000);
    jssc.stop();
}
like image 991
CruLPlay Avatar asked May 20 '15 09:05

CruLPlay


2 Answers

I had this issue with Spark 1.2.0, but upgraded to 1.3.1 and it's working now. I think the issue might be related to missing dependencies though. If you're using Maven you can adjust your pom.xml to build a proper uber jar.

<project>
<groupId>com.databricks.apps.logs</groupId>
<artifactId>log-analyzer</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Databricks Spark Logs Analyzer</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
    <repository>
        <id>Akka repository</id>
        <url>http://repo.akka.io/releases</url>
    </repository>
</repositories>
<dependencies>
    <dependency> <!-- Spark -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.3.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency> <!-- Spark SQL -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.3.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency> <!-- Spark Streaming -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.3.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.3.1</version>
    </dependency>
    <dependency> <!-- Command Line Parsing -->
        <groupId>commons-cli</groupId>
        <artifactId>commons-cli</artifactId>
        <version>1.2</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <filters>
                    <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
                <finalName>uber-${project.artifactId}-${project.version}</finalName>
            </configuration>
        </plugin>
    </plugins>
</build>

source: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/missing_dependencies_in_jar_files.html (I updated the version above to use Spark 1.3.1 and to include the Kafka dependency)

like image 66
Steven Avatar answered Oct 05 '22 21:10

Steven


Is your TraceEventDecoder constructor public, and is there a TraceEventDecoder(VerifiableProperties) version? This error can happen when something wrong is with parameters for your initialization of KafkaDirectStream. I've submitted an issue to Apache with this case: https://issues.apache.org/jira/browse/SPARK-9780

like image 30
Nemaefar Avatar answered Oct 05 '22 20:10

Nemaefar