Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark ExecutorLostFailure

Tags:

apache-spark

I'm trying to run spark 1.5 on mesos in cluster mode. I'm able to launch the dispatcher and to run the spark-submit. But when I do so, the spark driver fails with the following:

I1111 16:21:33.515130 25325 fetcher.cpp:414] Fetcher Info: {"cache_directory":"\/tmp\/mesos\/fetch\/slaves\/2bbe0c3b-433b-45e0-938b-f4d4532df129-S29","items":[{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/home\/optimus.prime\/Test.jar"}}],"sandbox_directory":"\/tmp\/mesos\/slaves\/2bbe0c3b-433b-45e0-938b-f4d4532df129-S29\/frameworks\/2bbe0c3b-433b-45e0-938b-f4d4532df129-0114\/executors\/driver-20151111162132-0036\/runs\/f0e8f4d7-35cb-4b73-bb5f-1112de2d8156"}
I1111 16:21:33.516376 25325 fetcher.cpp:369] Fetching URI '/home/optimus.prime/Test.jar'
I1111 16:21:33.516388 25325 fetcher.cpp:243] Fetching directly into the sandbox directory
I1111 16:21:33.516407 25325 fetcher.cpp:180] Fetching URI '/home/optimus.prime/Test.jar'
I1111 16:21:33.516417 25325 fetcher.cpp:160] Copying resource with command:cp '/home/optimus.prime/Test.jar' '/tmp/mesos/slaves/2bbe0c3b-433b-45e0-938b-f4d4532df129-S29/frameworks/2bbe0c3b-433b-45e0-938b-f4d4532df129-0114/executors/driver-20151111162132-0036/runs/f0e8f4d7-35cb-4b73-bb5f-1112de2d8156/Test.jar'
W1111 16:21:33.619190 25325 fetcher.cpp:265] Copying instead of extracting resource from URI with 'extract' flag, because it does not seem to be an archive: /home/optimus.prime/Test.jar
I1111 16:21:33.619221 25325 fetcher.cpp:446] Fetched '/home/optimus.prime/Test.jar' to '/tmp/mesos/slaves/2bbe0c3b-433b-45e0-938b-f4d4532df129-S29/frameworks/2bbe0c3b-433b-45e0-938b-f4d4532df129-0114/executors/driver-20151111162132-0036/runs/f0e8f4d7-35cb-4b73-bb5f-1112de2d8156/Test.jar'
I1111 16:21:33.769359 25335 exec.cpp:134] Version: 0.25.0
I1111 16:21:33.774183 25341 exec.cpp:208] Executor registered on slave 2bbe0c3b-433b-45e0-938b-f4d4532df129-S29
WARNING: Your kernel does not support swap limit capabilities. Limitation discarded.
15/11/11 16:21:34 INFO SparkContext: Running Spark version 1.5.1
15/11/11 16:21:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/11/11 16:21:35 INFO SecurityManager: Changing view acls to: root
15/11/11 16:21:35 INFO SecurityManager: Changing modify acls to: root
15/11/11 16:21:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
15/11/11 16:21:36 INFO Slf4jLogger: Slf4jLogger started
15/11/11 16:21:36 INFO Remoting: Starting remoting
15/11/11 16:21:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:36818]
15/11/11 16:21:36 INFO Utils: Successfully started service 'sparkDriver' on port 36818.
15/11/11 16:21:36 INFO SparkEnv: Registering MapOutputTracker
15/11/11 16:21:36 INFO SparkEnv: Registering BlockManagerMaster
15/11/11 16:21:37 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-2e733585-81ae-45ad-b81d-f2b977e38153
15/11/11 16:21:37 INFO MemoryStore: MemoryStore started with capacity 1069.1 MB
15/11/11 16:21:37 INFO HttpFileServer: HTTP File server directory is /tmp/spark-bbd7944b-7ffc-4911-a51b-5bed4e174fad/httpd-f94199aa-972d-4724-ad9e-f237401c6bab
15/11/11 16:21:37 INFO HttpServer: Starting HTTP Server
15/11/11 16:21:37 INFO Utils: Successfully started service 'HTTP file server' on port 53947.
15/11/11 16:21:37 INFO SparkEnv: Registering OutputCommitCoordinator
15/11/11 16:21:37 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/11/11 16:21:37 INFO SparkUI: Started SparkUI at http://10.241.10.12:4040
15/11/11 16:21:37 INFO SparkContext: Added JAR file:/mnt/mesos/sandbox/Test.jar at http://10.241.10.12:53947/jars/Test.jar with timestamp 1447258897676
15/11/11 16:21:37 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
I1111 16:21:37.906981    96 sched.cpp:164] Version: 0.25.0
2015-11-11 16:21:37,907:9(0x7f67d2d3c700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5
2015-11-11 16:21:37,907:9(0x7f67d2d3c700):ZOO_INFO@log_env@716: Client environment:host.name=mesos-slaves-spark-bjrg
2015-11-11 16:21:37,907:9(0x7f67d2d3c700):ZOO_INFO@log_env@723: Client environment:os.name=Linux
2015-11-11 16:21:37,907:9(0x7f67d2d3c700):ZOO_INFO@log_env@724: Client environment:os.arch=3.19.0-33-generic
2015-11-11 16:21:37,907:9(0x7f67d2d3c700):ZOO_INFO@log_env@725: Client environment:os.version=#38~14.04.1-Ubuntu SMP Fri Nov 6 18:17:28 UTC 2015
2015-11-11 16:21:37,907:9(0x7f67d2d3c700):ZOO_INFO@log_env@733: Client environment:user.name=(null)
2015-11-11 16:21:37,907:9(0x7f67d2d3c700):ZOO_INFO@log_env@741: Client environment:user.home=/root
2015-11-11 16:21:37,908:9(0x7f67d2d3c700):ZOO_INFO@log_env@753: Client environment:user.dir=/opt/spark
2015-11-11 16:21:37,908:9(0x7f67d2d3c700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=10.241.10.3:2181,10.241.10.4:2181,110.241.10.5:2181 sessionTimeout=10000 watcher=0x7f67dc7e3600 sessionId=0 sessionPasswd=<null> context=0x7f67ec021650 flags=0
2015-11-11 16:21:37,915:9(0x7f67d1438700):ZOO_INFO@check_events@1703: initiated connection to server [10.241.10.3:2181]
2015-11-11 16:21:37,917:9(0x7f67d1438700):ZOO_INFO@check_events@1750: session establishment complete on server [10.241.10.3:2181], sessionId=0x150a0c4f8a720bd, negotiated timeout=10000
I1111 16:21:37.917933    91 group.cpp:331] Group process (group(1)@10.241.10.12:59519) connected to ZooKeeper
I1111 16:21:37.918011    91 group.cpp:805] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0)
I1111 16:21:37.918088    91 group.cpp:403] Trying to create path '/mesos' in ZooKeeper
I1111 16:21:37.919067    91 detector.cpp:156] Detected a new leader: (id='11')
I1111 16:21:37.919288    91 group.cpp:674] Trying to get '/mesos/json.info_0000000011' in ZooKeeper
I1111 16:21:37.919922    91 detector.cpp:481] A new leading master ([email protected]:5050) is detected
I1111 16:21:37.920075    91 sched.cpp:262] New master detected at [email protected]:5050
I1111 16:21:37.920300    91 sched.cpp:272] No credentials provided. Attempting to register without authentication
I1111 16:21:37.926208    88 sched.cpp:641] Framework registered with 2bbe0c3b-433b-45e0-938b-f4d4532df129-0163
15/11/11 16:21:37 INFO MesosSchedulerBackend: Registered as framework ID 2bbe0c3b-433b-45e0-938b-f4d4532df129-0163
15/11/11 16:21:38 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57551.
15/11/11 16:21:38 INFO NettyBlockTransferService: Server created on 57551
15/11/11 16:21:38 INFO BlockManagerMaster: Trying to register BlockManager
15/11/11 16:21:38 INFO BlockManagerMasterEndpoint: Registering block manager 10.241.10.12:57551 with 1069.1 MB RAM, BlockManagerId(driver, 10.241.10.12, 57551)
15/11/11 16:21:38 INFO BlockManagerMaster: Registered BlockManager
15/11/11 16:21:39 INFO SparkContext: Starting job: sumApprox at Test.scala:21
15/11/11 16:21:39 INFO DAGScheduler: Got job 0 (sumApprox at Test.scala:21) with 8 output partitions
15/11/11 16:21:39 INFO DAGScheduler: Final stage: ResultStage 0(sumApprox at Test.scala:21)
15/11/11 16:21:39 INFO DAGScheduler: Parents of final stage: List()
15/11/11 16:21:39 INFO DAGScheduler: Missing parents: List()
15/11/11 16:21:39 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at numericRDDToDoubleRDDFunctions at Test.scala:21), which has no missing parents
15/11/11 16:21:39 INFO MemoryStore: ensureFreeSpace(1760) called with curMem=0, maxMem=1120995901
15/11/11 16:21:39 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1760.0 B, free 1069.1 MB)
15/11/11 16:21:39 INFO MemoryStore: ensureFreeSpace(1151) called with curMem=1760, maxMem=1120995901
15/11/11 16:21:39 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1151.0 B, free 1069.1 MB)
15/11/11 16:21:39 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.241.10.12:57551 (size: 1151.0 B, free: 1069.1 MB)
15/11/11 16:21:39 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
15/11/11 16:21:39 INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at numericRDDToDoubleRDDFunctions at Test.scala:21)
15/11/11 16:21:39 INFO TaskSchedulerImpl: Adding task set 0.0 with 8 tasks
15/11/11 16:21:39 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.241.10.15, PROCESS_LOCAL, 2053 bytes)
15/11/11 16:21:39 INFO TaskSetManager: Re-queueing tasks for 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 from TaskSet 0.0
15/11/11 16:21:39 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.241.10.15): ExecutorLostFailure (executor 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 lost)
15/11/11 16:21:39 INFO DAGScheduler: Executor lost: 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 (epoch 0)
15/11/11 16:21:39 INFO BlockManagerMasterEndpoint: Trying to remove executor 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 from BlockManagerMaster.
15/11/11 16:21:39 INFO BlockManagerMaster: Removed 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 successfully in removeExecutor
15/11/11 16:21:39 INFO DAGScheduler: Host added was in lost list earlier: 10.241.10.15
15/11/11 16:21:39 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, 10.241.10.15, PROCESS_LOCAL, 2053 bytes)
15/11/11 16:21:40 INFO TaskSetManager: Re-queueing tasks for 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 from TaskSet 0.0
15/11/11 16:21:40 WARN TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1, 10.241.10.15): ExecutorLostFailure (executor 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 lost)
15/11/11 16:21:40 INFO DAGScheduler: Executor lost: 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 (epoch 1)
15/11/11 16:21:40 INFO BlockManagerMasterEndpoint: Trying to remove executor 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 from BlockManagerMaster.
15/11/11 16:21:40 INFO BlockManagerMaster: Removed 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 successfully in removeExecutor
15/11/11 16:21:40 INFO DAGScheduler: Host added was in lost list earlier: 10.241.10.15
15/11/11 16:21:40 INFO TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, 10.241.10.15, PROCESS_LOCAL, 2053 bytes)
15/11/11 16:21:40 INFO TaskSetManager: Re-queueing tasks for 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 from TaskSet 0.0
15/11/11 16:21:40 WARN TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2, 10.241.10.15): ExecutorLostFailure (executor 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 lost)
15/11/11 16:21:40 INFO DAGScheduler: Executor lost: 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 (epoch 2)
15/11/11 16:21:40 INFO BlockManagerMasterEndpoint: Trying to remove executor 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 from BlockManagerMaster.
15/11/11 16:21:40 INFO BlockManagerMaster: Removed 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 successfully in removeExecutor
15/11/11 16:21:40 INFO DAGScheduler: Host added was in lost list earlier: 10.241.10.15
15/11/11 16:21:40 INFO TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, 10.241.10.15, PROCESS_LOCAL, 2053 bytes)
15/11/11 16:21:40 INFO TaskSetManager: Re-queueing tasks for 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 from TaskSet 0.0
15/11/11 16:21:40 WARN TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3, 10.241.10.15): ExecutorLostFailure (executor 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 lost)
15/11/11 16:21:40 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
15/11/11 16:21:40 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/11/11 16:21:40 INFO TaskSchedulerImpl: Cancelling stage 0
15/11/11 16:21:40 INFO DAGScheduler: ResultStage 0 (sumApprox at Test.scala:21) failed in 0.713 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.241.10.15): ExecutorLostFailure (executor 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 lost)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/11/11 16:21:40 INFO DAGScheduler: Executor lost: 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 (epoch 3)
15/11/11 16:21:40 INFO SparkContext: Invoking stop() from shutdown hook
15/11/11 16:21:40 INFO BlockManagerMasterEndpoint: Trying to remove executor 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 from BlockManagerMaster.
15/11/11 16:21:40 INFO BlockManagerMaster: Removed 2bbe0c3b-433b-45e0-938b-f4d4532df129-S31 successfully in removeExecutor
15/11/11 16:21:40 INFO DAGScheduler: Host added was in lost list earlier: 10.241.10.15
15/11/11 16:21:40 INFO SparkUI: Stopped Spark web UI at http://10.241.10.12:4040
15/11/11 16:21:40 INFO DAGScheduler: Stopping DAGScheduler
I1111 16:21:40.447157   108 sched.cpp:1771] Asked to stop the driver
I1111 16:21:40.447325    87 sched.cpp:1040] Stopping framework '2bbe0c3b-433b-45e0-938b-f4d4532df129-0163'
15/11/11 16:21:40 INFO MesosSchedulerBackend: driver.run() returned with code DRIVER_STOPPED
15/11/11 16:21:40 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/11/11 16:21:40 INFO MemoryStore: MemoryStore cleared
15/11/11 16:21:40 INFO BlockManager: BlockManager stopped
15/11/11 16:21:40 INFO BlockManagerMaster: BlockManagerMaster stopped
15/11/11 16:21:40 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/11/11 16:21:40 INFO SparkContext: Successfully stopped SparkContext
15/11/11 16:21:40 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/11/11 16:21:40 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/11/11 16:21:40 INFO ShutdownHookManager: Shutdown hook called
15/11/11 16:21:40 INFO ShutdownHookManager: Deleting directory /tmp/spark-bbd7944b-7ffc-4911-a51b-5bed4e174fad

Also, since I'm using docker, I've search for the logs of the slave that should execute the task, and I get:

root@bfa1a77de2af:/opt/spark# exit
exit

Any idea on whats the error?

Thanks

like image 989
luis.alves Avatar asked Nov 11 '15 16:11

luis.alves


People also ask

What happens if a Spark executor fails?

FileAlreadyExistsException in Spark jobs As a result, the FileAlreadyExistsException error occurs. When any Spark executor fails, Spark retries to start the task, which might result into FileAlreadyExistsException error after the maximum number of retries.

How can we prevent Spark out of memory error?

In case of an inappropriate number of spark cores for our executors, we will have to process too many partitions. All these will be running in parallel and will have their memory overhead; therefore, they need the executor memory and can probably cause OutOfMemory errors. To fix this, we can configure spark. default.

How to check Spark job failure?

When a Spark job or application fails, you can use the Spark logs to analyze the failures. The QDS UI provides links to the logs in the Application UI and Spark Application UI. If you are running the Spark job or application from the Analyze page, you can access the logs via the Application UI and Spark Application UI.

How to handle Spark job failure?

To do this, click on Stages in the Spark UI and then look for the Failed Stages section at the bottom of the page. If an executor runs into memory issues, it will fail the task and restart where the last task left off.


2 Answers

I was getting similar issues and used some trial-and-error to find the cause and solution. I might not be able to give the 'real' reason but trying it out the below way can help you resolve it.

Try launching spark-shell with memory and core parameters:

spark-shell 
--driver-memory=2g 
--executor-memory=7g 
--num-executors=8 
--executor-cores=4 
--conf "spark.storage.memoryFraction=1" // important
--conf "spark.akka.frameSize=200" // keep it sufficiently high, maybe higher than 100 is a good thing
--conf "spark.default.parallelism=100" 
--conf "spark.core.connection.ack.wait.timeout=600" 
--conf "spark.yarn.executor.memoryOverhead=2048" // (in mb) not really valid for shell, but good thing for spark-submit
--conf "spark.yarn.driver.memoryOverhead=400" // not really valid for shell, but good thing for spark-submit. minimum 384 (in mb)

Now, if total memory (driver memory + num executors * executor memory) goes beyond available memory, it's going to throw error. I believe that's not the case for you.

Executor cores, keep it small, say, 2 or 4.

executor memory = (total memory - driver memory)/ number of executors .. actually a little less.

  • Try increasing number of executors while reducing the executor memory to keep the memory under control.
  • Once spark-shell starts, go to the job in job monitor and check the 'executors' tab and you can that even if you put, say, 20 executors, only 10 are getting created. That's an indication of how far you can go.
  • Reduce the number of executors to a suitable number below that max number and change the 'executor memory' parameter accordingly.
  • Once you reach an executor number that you're putting in spark-shell and you're getting the same number of executors, you're 'almost' good.

Next is to run the code in the spark-shell prompt and check how much memory is getting utilized in the Executors tab.

  • If you find that the last few 'collection' steps are taking a lot of time, the executor memory needs to increase.
  • If increasing the executor memory goes beyond the limit as we calculated earlier, then decrease the number of executors and assign more memory to each.

What I understood (empirical though) is that, following type of problems can occur:

  • a reduce/ shuffle operation running for long, doing a time-out
  • long running thread creating non-responsive actors
  • not-enough akka frames to watch over too many threads (tasks)

I hope this helps you to get the right configuration. Once that's set, you can use the same configurations during submitting a spark-submit job.

Note: I got a cluster with a lot of resource constraints and multiple users using it in ad-hoc ways .. making resources uncertain and therefore calculations have to be in the 'safer' limit. This resulted in a lot of iterative experiments.

like image 57
Partha Mishra Avatar answered Nov 27 '22 01:11

Partha Mishra


Almost always when I had 'executor lost' failures in Spark adding more memory solved these problems. Try increasing values for --executor-memory and/or --driver-memory options that you may pass to spark-submit.

like image 28
godfryd Avatar answered Nov 27 '22 00:11

godfryd