Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What are possible reasons for receiving TimeoutException: Futures timed out after [n seconds] when working with Spark [duplicate]

Tags:

I'm working on a Spark SQL program and I'm receiving the following exception:

16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds] java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)     at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)     at scala.concurrent.Await$.result(package.scala:190)     at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)     at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)     at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)     at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)     at scala.collection.immutable.List.foreach(List.scala:381)     at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)     at scala.collection.immutable.List.map(List.scala:285)     at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)     at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129)     at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118)     at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)     at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)     at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)     at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)     at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581)     at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590)     at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)     at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32)     at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52)     at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15)     at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)     at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)     at scala.Option.getOrElse(Option.scala:121)     at com.somecompany.ml.Main$.main(Main.scala:46)     at com.somecompany.ml.Main.main(Main.scala)     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.lang.reflect.Method.invoke(Method.java:498)     at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) 16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]) 

The last part of my code that I recognize from the stack trace is com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56) which gets me to this line: profilesDF.cache() Before the caching I perform a union between 2 dataframes. I've seen an answer about persisting both the dataframes before the join here I still need to cache the unioned dataframe since I'm using it in several of my transformations

And I was wondering what may cause this exception to be thrown? Searching for it got me to a link dealing with rpc timeout exception or some security issues which is not my problem If you also have any idea on how to solve it I'd obviously appreciate it but even just understanding the problem will help me solve it

Thanks in advance

like image 853
Gideon Avatar asked Nov 07 '16 20:11

Gideon


2 Answers

Question : I was wondering what may cause this exception to be thrown?

Answer :

spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins

spark.network.timeout 120s Default timeout for all network interactions.. spark.network.timeout (spark.rpc.askTimeout), spark.sql.broadcastTimeout, spark.kryoserializer.buffer.max(if you are using kryo serialization), etc. are tuned with larger-than-default values in order to handle complex queries. You can start with these values and adjust accordingly to your SQL workloads.

Note : Doc says that

The following options(see spark.sql. properties) can also be used to tune the performance of query execution. It is possible that these options will be deprecated in future release as more optimizations are performed automatically.*

Also,for your better understanding you can see BroadCastHashJoin where execute method is trigger point for the above stack trace.

protected override def doExecute(): RDD[Row] = {     val broadcastRelation = Await.result(broadcastFuture, timeout)      streamedPlan.execute().mapPartitions { streamedIter =>       hashJoin(streamedIter, broadcastRelation.value)     }   } 
like image 150
Ram Ghadiyaram Avatar answered Oct 05 '22 13:10

Ram Ghadiyaram


Good to know that the suggestion from Ram works in some cases. I'd like to mention that I stumbled on this exception a couple of times (including the one described here).

Much of the time, it was due to almost silent OOMs on some executor. Check on SparkUI for failed tasks, last column of this table: task panel for a stage in SparkUI You may notice OOM messages.

If understand well spark internals, the broadcasted data passes through the driver. So the driver has some thread mechanism to collect the data from executors, and send it back to all. If at some point an executor fails, you may end up with these timeouts.

like image 43
mathieu Avatar answered Oct 05 '22 12:10

mathieu