I have a very large RDD that I am caching (it still fits into memory), but since it is so big, I want to unpersist it as soon as possible. However when I call unpersist on it, it is causing an RPC timeout error:
17/11/21 23:25:55 INFO BlockManager: Removing RDD 171
Exception in thread "main" org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:135)
at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1793)
at org.apache.spark.rdd.RDD.unpersist(RDD.scala:216)
17/11/21 23:27:55 WARN BlockManagerMaster: Failed to remove RDD 171 - Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
The code that is triggering this error looks like the following:
val tranformation1 = firstTransformation(inputData).cache
log("Tranformation1 Count: " + transformation1.count)
val transformation2 = secondTransformation(transformation1).cache
transformation1.unpersist()
Unpersisting an RDD should be a relatively inexpensive action. How can unpersisting an RDD cause an RPC timeout?
Slightly more comprehensive answer as it is most likely a version dependent issue you are encountering - things have changed:
From the JIRA:
The RDD and DataFrame .unpersist() method, and Broadcast .destroy() method, take an optional 'blocking' argument. The default was 'false' in all cases except for (Scala) RDDs and their GraphX subclasses.
The default is now 'false' (non-blocking) in all of these methods.
Pyspark's RDD and Broadcast classes now have an optional 'blocking' argument as well, with the same behavior.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With