Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can unpersisting an RDD cause an RPC timeout?

I have a very large RDD that I am caching (it still fits into memory), but since it is so big, I want to unpersist it as soon as possible. However when I call unpersist on it, it is causing an RPC timeout error:

17/11/21 23:25:55 INFO BlockManager: Removing RDD 171
Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
        at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:135)
        at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1793)
        at org.apache.spark.rdd.RDD.unpersist(RDD.scala:216)

17/11/21 23:27:55 WARN BlockManagerMaster: Failed to remove RDD 171 - Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout

The code that is triggering this error looks like the following:

val tranformation1 = firstTransformation(inputData).cache
log("Tranformation1 Count: " + transformation1.count)
val transformation2 = secondTransformation(transformation1).cache
transformation1.unpersist()

Unpersisting an RDD should be a relatively inexpensive action. How can unpersisting an RDD cause an RPC timeout?

like image 574
B. Smith Avatar asked Nov 08 '22 13:11

B. Smith


1 Answers

Slightly more comprehensive answer as it is most likely a version dependent issue you are encountering - things have changed:

  • See https://issues.apache.org/jira/browse/SPARK-26771?
    • Make .unpersist(), .destroy() consistently non-blocking by default

From the JIRA:

The RDD and DataFrame .unpersist() method, and Broadcast .destroy() method, take an optional 'blocking' argument. The default was 'false' in all cases except for (Scala) RDDs and their GraphX subclasses.

The default is now 'false' (non-blocking) in all of these methods.

Pyspark's RDD and Broadcast classes now have an optional 'blocking' argument as well, with the same behavior.

like image 126
thebluephantom Avatar answered Nov 15 '22 11:11

thebluephantom