Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is "Error communicating with MapOutputTracker" reported when Spark tries to send GetMapOutputStatuses?

I'm using Spark 1.3 to do an aggregation on a lot of data. The job consists of 4 steps:

  1. Read a big (1TB) sequence file (corresponding to 1 day of data)
  2. Filter out most of it and get about 1GB of shuffle write
  3. keyBy customer
  4. aggregateByKey() to a custom structure that build a profile for that customer, corresponding to a HashMap[Long, Float] per customer. The Long keys are unique and never bigger than 50K distinct entries.

I'm running this with this configuration:

--name geo-extract-$1-askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \

And getting this error:

    org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
        ...
    Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
        ... 21 more
    Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)

The job and the logic have been shown to work with a small test set and I can even run this job for some dates but not for others. I've googled around and found hints that "Error communicating with MapOutputTracker" is related to internal Spark messages, but I already increased "spark.akka.frameSize", "spark.akka.timeout" and "spark.akka.askTimeout" (this last one does not even appear on Spark documentation, but was mentioned in the Spark mailing list), to no avail. There is still some timeout going on at 30 seconds that I have no clue how to identify or fix.

I see no reason for this to fail due to data size, as the filtering operation and the fact that aggregateByKey performs local partial aggregations should be enough to address the data size. The number of tasks is 16K (automatic from the original input), much more than the 800 cores that are running this, on 100 executors, so it is not as simple as the usual "increment partitions" tip. Any clues would be greatly appreciated! Thanks!

like image 896
Daniel Langdon Avatar asked Sep 09 '15 18:09

Daniel Langdon


People also ask

What happens if a spark executor fails?

As a result, the FileAlreadyExistsException error occurs. When any Spark executor fails, Spark retries to start the task, which might result into FileAlreadyExistsException error after the maximum number of retries.


2 Answers

I had a similar issue, that my job would work fine with a smaller dataset, but will fail with larger ones.

After a lot of configuration changes, I found that the changing the driver memory settings has much more of an impact than changing the executor memory settings. Also using the new garbage collector helps a lot. I am using the following configuration for a cluster of 3, with 40 cores each. Hope the following config helps:

spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -  
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g 
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions

spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g   
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions


spark.driver.memory=8g
spark.driver.cores=10
spark.driver.maxResultSize=8g

spark.executor.memory=16g
spark.executor.cores=25

spark.default.parallelism=50
spark.eventLog.dir=hdfs://mars02-db01/opt/spark/logs
spark.eventLog.enabled=true

spark.kryoserializer.buffer=512m
spark.kryoserializer.buffer.max=1536m

spark.rdd.compress=true
spark.storage.memoryFraction=0.15
spark.storage.MemoryStore=12g
like image 118
sparkDabbler Avatar answered Oct 30 '22 12:10

sparkDabbler


What's going on in the driver at the time of this failure? It could be due to memory pressure on the driver causing it to be unresponsive. If I recall correctly, the MapOutputTracker that it's trying to get to when it calls GetMapOutputStatuses is running in the Spark driver driver process.

If you're facing long GCs or other pauses for some reason in that process this would cause the exceptions you're seeing above.

Some things to try would be to try jstacking the driver process when you start seeing these errors and see what happens. If jstack doesn't respond, it could be that your driver isn't sufficiently responsive.

16K tasks does sound like it would be a lot for the driver to keep track of, any chance you can increase the driver memory past 4g?

like image 2
Hamel Kothari Avatar answered Oct 30 '22 13:10

Hamel Kothari