Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

org.apache.spark.SparkException: Job aborted due to stage failure: Task from application

Tags:

apache-spark

I have a problem with running spark application on standalone cluster. (I use spark 1.1.0 version). I succesfully run master server by command:

bash start-master.sh 

Then I run one worker by command:

bash spark-class org.apache.spark.deploy.worker.Worker spark://fujitsu11:7077

At master’s web UI:

http://localhost:8080  

I see, that master and worker are running.

Then I run my application from Eclipse Luna. I successfully connect to cluster by command

JavaSparkContext sc = new JavaSparkContext("spark://fujitsu11:7077", "myapplication");

And after that application works, but when program achieve following code:

 JavaRDD<Document> collectionRdd = sc.parallelize(list);

It's crashing with following error message:

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 11, fujitsu11.inevm.ru):java.lang.ClassNotFoundException: maven.maven1.Document
 java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 java.security.AccessController.doPrivileged(Native Method)
 java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    java.lang.Class.forName0(Native Method)
    java.lang.Class.forName(Class.java:270)
    org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
    java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
    java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
    org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:74)
    sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    java.lang.reflect.Method.invoke(Method.java:606)
    java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    java.lang.Thread.run(Thread.java:744)
 Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

In shell I found:

14/11/12 18:46:06 INFO ExecutorRunner: Launch command: "C:\PROGRA~1\Java\jdk1.7.0_51/bin/java"  "-cp" ";;D:\spark\bin\..\conf;D:\spark\bin\..\lib\spark-assembly-
1.1.0-hadoop1.0.4.jar;;D:\spark\bin\..\lib\datanucleus-api-jdo-3.2.1.jar;D:\spar
k\bin\..\lib\datanucleus-core-3.2.2.jar;D:\spark\bin\..\lib\datanucleus-rdbms-3.
2.1.jar" "-XX:MaxPermSize=128m" "-Dspark.driver.port=50913" "-Xms512M" "-Xmx512M
" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "akka.tcp://sparkDriv
[email protected]:50913/user/CoarseGrainedScheduler" "0" "fujitsu11.inevm.ru
" "8" "akka.tcp://[email protected]:50892/user/Worker" "app-2014111
2184605-0000"
14/11/12 18:46:40 INFO Worker: Asked to kill executor app-20141112184605-0000/0
14/11/12 18:46:40 INFO ExecutorRunner: Runner thread for executor app-2014111218
4605-0000/0 interrupted
14/11/12 18:46:40 INFO ExecutorRunner: Killing process!
14/11/12 18:46:40 INFO Worker: Executor app-20141112184605-0000/0 finished with
state KILLED exitStatus 1
14/11/12 18:46:40 INFO LocalActorRef: Message [akka.remote.transport.ActorTransp
ortAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtoco
l-tcp%3A%2F%2FsparkWorker%40192.168.3.5%3A50955-2#1066511138] was not delivered.
[1] dead letters encountered. This logging can be turned off or adjusted with c
onfiguration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-
shutdown'.
14/11/12 18:46:40 INFO LocalActorRef: Message [akka.remote.transport.Association
Handle$Disassociated] from Actor[akka://sparkWorker/deadLetters] to Actor[akka:/
/sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2
FsparkWorker%40192.168.3.5%3A50955-2#1066511138] was not delivered. [2] dead let
ters encountered. This logging can be turned off or adjusted with configuration
settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14/11/12 18:46:41 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker
@fujitsu11.inevm.ru:50892] -> [akka.tcp://[email protected]:50954
]: Error [Association failed with [akka.tcp://[email protected]:5
0954]] [
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sp
[email protected]:50954]
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon
$2: Connection refused: no further information: fujitsu11.inevm.ru/192.168.3.5:5
0954
]
14/11/12 18:46:42 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker
@fujitsu11.inevm.ru:50892] -> [akka.tcp://[email protected]:50954
]: Error [Association failed with [akka.tcp://[email protected]:5
0954]] [
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sp
[email protected]:50954]
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon
$2: Connection refused: no further information: fujitsu11.inevm.ru/192.168.3.5:5
0954
]
14/11/12 18:46:43 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker
@fujitsu11.inevm.ru:50892] -> [akka.tcp://[email protected]:50954
]: Error [Association failed with [akka.tcp://[email protected]:5
0954]] [
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sp
[email protected]:50954]
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon
$2: Connection refused: no further information: fujitsu11.inevm.ru/192.168.3.5:5
0954
]

In logs:

14/11/12 18:46:41 ERROR EndpointWriter: AssociationError    [akka.tcp://sparkMaster@fujitsu11:7077]     -> [akka.tcp://[email protected]:50913]:   Error [Association failed with [akka.tcp://[email protected]:50913]] [
akka.remote.EndpointAssociationException: Association failed with   [akka.tcp://[email protected]:50913]
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection  refused: no further information: fujitsu11.inevm.ru/192.168.3.5:50913
]
14/11/12 18:46:42 INFO Master: akka.tcp://[email protected]:50913 got disassociated,   removing it.
14/11/12 18:46:42 ERROR EndpointWriter: AssociationError [akka.tcp://sparkMaster@fujitsu11:7077] -> [akka.tcp://[email protected]:50913]: Error [Association failed with   [akka.tcp://[email protected]:50913]] [
akka.remote.EndpointAssociationException: Association failed with   [akka.tcp://[email protected]:50913]
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection  refused: no further information: fujitsu11.inevm.ru/192.168.3.5:50913
]
14/11/12 18:46:43 ERROR EndpointWriter: AssociationError [akka.tcp://sparkMaster@fujitsu11:7077] -> [akka.tcp://[email protected]:50913]: Error [Association failed with   [akka.tcp://[email protected]:50913]] [
akka.remote.EndpointAssociationException: Association failed with   [akka.tcp://[email protected]:50913]
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection  refused: no further information: fujitsu11.inevm.ru/192.168.3.5:50913
]

I googled a lot but I have no idea whats wrong... I found a bit similar discussion here:

https://github.com/datastax/spark-cassandra-connector/issues/187

But it doesn't solve my problem...

Somebody knows whats wrong?

Thank You.

like image 222
dimson Avatar asked Nov 12 '14 17:11

dimson


3 Answers

For the benefit of others running into this problem:

I faced an identical issue due to a mismatch between the spark connector and spark version being used. Spark was 1.3.1 and the connector was 1.3.0, an identical error message appeared:

org.apache.spark.SparkException: Job aborted due to stage failure:
  Task 2 in stage 0.0 failed 4 times, most recent failure: Lost 
  task 2.3 in stage 0.0

Updating the dependancy in SBT solved the problem.

like image 77
Lyuben Todorov Avatar answered Oct 24 '22 07:10

Lyuben Todorov


Found a way to run it using IDE / Maven

  1. Create a Fat Jar ( One which includes all dependencies ). Use Shade Plugin for this. Example pom :
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.2</version>
    <configuration>
        <filters>
            <filter>
                <artifact>*:*</artifact>
                <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                </excludes>
            </filter>
        </filters>
    </configuration>
    <executions>
        <execution>
            <id>job-driver-jar</id>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <shadedArtifactAttached>true</shadedArtifactAttached>
                <shadedClassifierName>driver</shadedClassifierName>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                    <!--
                    Some care is required:
                    http://doc.akka.io/docs/akka/snapshot/general/configuration.html
                    -->
                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>reference.conf</resource>
                    </transformer>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>mainClass</mainClass>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
        <execution>
            <id>worker-library-jar</id>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <shadedArtifactAttached>true</shadedArtifactAttached>
                <shadedClassifierName>worker</shadedClassifierName>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>
  1. Now we have to send the compiled jar file to the cluster. For this, specify the jar file in the spark config like this :

SparkConf conf = new SparkConf().setAppName("appName").setMaster("spark://machineName:7077").setJars(new String[] {"target/appName-1.0-SNAPSHOT-driver.jar"});

  1. Run mvn clean package to create the Jar file. It will be created in your target folder.

  2. Run using your IDE or using maven command :

mvn exec:java -Dexec.mainClass="className"

This does not require spark-submit. Just remember to package file before running

If you don't want to hardcode the jar path, you can do this :

  1. In the config, write :

SparkConf conf = new SparkConf() .setAppName("appName") .setMaster("spark://machineName:7077") .setJars(JavaSparkContext.jarOfClass(this.getClass()));

  1. Create the fat jar ( as above ) and run using maven after running package command :

java -jar target/application-1.0-SNAPSHOT-driver.jar

This will take the jar from the jar the class was loaded.

like image 3
Aditya Pawade Avatar answered Oct 24 '22 09:10

Aditya Pawade


I came across the same error message and in my case it was my rdd was empty and an aggregation task was attempted against it.

Listing this case here for the benefit of others running into this error message: Job aborted due to stage failure: Task 9 in stage 24.0 failed 4 times

This advice in link provided below helped. ".. rdd is getting empty. The null pointer exception indicates that an aggregation task is attempted against of a null value. Check your data for null where not null should be present and especially on those columns that are subject of aggregation" https://community.cloudera.com/t5/Support-Questions/PySpark-failuer-spark-SparkException-Job-aborted-due-to/td-p/171147

like image 3
KUMN Avatar answered Oct 24 '22 07:10

KUMN