Considering memory being limited, I had a feeling that spark automatically removes RDD from each node. I'd like to know is this time configurable? How does spark decide when to evict an RDD from memory
Note: I'm not talking about rdd.cache()
I'd like to know is this time configurable? How does spark decide when to evict an RDD from memory
An RDD
is an object just like any other. If you don't persist/cache it, it will act as any other object under a managed language would and be collected once there are no alive root objects pointing to it.
The "how" part, as @Jacek points out is the responsibility of an object called ContextCleaner
. Mainly, if you want the details, this is what the cleaning method looks like:
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
If you want to learn more, I suggest browsing Sparks source or even better, reading @Jacek book called "Mastering Apache Spark" (This points to an explanation regarding ContextCleaner
)
In general, that's how Yuval Itzchakov wrote "just like any other object", but...(there's always "but", isn't it?)
In Spark, it's not that obvious since we have shuffle blocks (among the other blocks managed by Spark). They are managed by BlockManagers running on executors. They somehow will have to be notified when an object on the driver gets evicted from memory, right?
That's where ContextCleaner comes to stage. It's Spark Application's Garbage Collector that is responsible for application-wide cleanup of shuffles, RDDs, broadcasts, accumulators and checkpointed RDDs that is aimed at reducing the memory requirements of long-running data-heavy Spark applications.
ContextCleaner runs on the driver. It is created and immediately started when SparkContext
starts (and spark.cleaner.referenceTracking
Spark property is enabled, which it is by default). It is stopped when SparkContext
is stopped.
You can see it working by doing the dump of all the threads in a Spark application using jconsole
or jstack
. ContextCleaner uses a daemon Spark Context Cleaner thread that cleans RDD, shuffle, and broadcast states.
You can also see its work by enabling INFO
or DEBUG
logging levels for org.apache.spark.ContextCleaner
logger. Just add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.ContextCleaner=DEBUG
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With