I'm dealing with some strange error messages that I think comes down to a memory issue, but I'm having a hard time pinning it down and could use some guidance from the experts.
I have a 2-machine Spark (1.0.1) cluster. Both machines have 8 cores; one has 16GB memory, the other 32GB (which is the master). My application involves computing pairwise pixel affinities in images, though the images I've tested so far only get as big as 1920x1200, and as small as 16x16.
I did have to change a few memory and parallelism settings, otherwise I was getting explicit OutOfMemoryExceptions. In spark-default.conf:
spark.executor.memory 14g
spark.default.parallelism 32
spark.akka.frameSize 1000
In spark-env.sh:
SPARK_DRIVER_MEMORY=10G
With those settings, however, I get a bunch of WARN statements about "Lost TIDs" (no task is successfully completed) in addition to lost Executors, which are repeated 4 times until I finally get the following error message and crash:
14/07/18 12:06:20 INFO TaskSchedulerImpl: Cancelling stage 0
14/07/18 12:06:20 INFO DAGScheduler: Failed to run collect at /home/user/Programming/PySpark-Affinities/affinity.py:243
Traceback (most recent call last):
File "/home/user/Programming/PySpark-Affinities/affinity.py", line 243, in <module>
lambda x: np.abs(IMAGE.value[x[0]] - IMAGE.value[x[1]])
File "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py", line 583, in collect
bytesInJava = self._jrdd.collect().iterator()
File "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__
File "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o27.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:13 failed 4 times, most recent failure: TID 32 on host master.host.univ.edu failed for unknown reason
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/07/18 12:06:20 INFO DAGScheduler: Executor lost: 4 (epoch 4)
14/07/18 12:06:20 INFO BlockManagerMasterActor: Trying to remove executor 4 from BlockManagerMaster.
14/07/18 12:06:20 INFO BlockManagerMaster: Removed 4 successfully in removeExecutor
user@master:~/Programming/PySpark-Affinities$
If I run the really small image instead (16x16), it appears to run to completion (gives me the output I expect without any exceptions being thrown). However, in the stderr logs for the app that was run, it lists the state as "KILLED" with the final message a "ERROR CoarseGrainedExecutorBackend: Driver Disassociated". If I run any larger images, I get the exception I pasted above.
Furthermore, if I just do a spark-submit with master=local[*]
, aside from still needing to set the aforementioned memory options, it will work for an image of any size (I've tested both machines independently; they both do this when running as local[*]
).
Any ideas what is going on?
If I had a penny for every time I asked people "have you tried increasing the number of partitions to something quite large like at least 4 tasks per CPU - like even as high as 1000 partitions?" I'd be a rich man. So have you tried increasing the partitions?
Anyway, other things I've found help with weird dissasociations are:
Also sometimes you get more informative stack traces by using the UI to navigate to the specific workers stderr logs.
UPDATE: Since spark 1.0.0 finding the Spark logs cannot be done via the UI, you have to ask your sysadm/devops to help you since the location of the logs is completely undocumented.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With