Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How long does RDD remain in memory?

Considering memory being limited, I had a feeling that spark automatically removes RDD from each node. I'd like to know is this time configurable? How does spark decide when to evict an RDD from memory

Note: I'm not talking about rdd.cache()

like image 367
Faiz Halde Avatar asked Mar 31 '17 11:03

Faiz Halde

2 Answers

I'd like to know is this time configurable? How does spark decide when to evict an RDD from memory

An RDD is an object just like any other. If you don't persist/cache it, it will act as any other object under a managed language would and be collected once there are no alive root objects pointing to it.

The "how" part, as @Jacek points out is the responsibility of an object called ContextCleaner. Mainly, if you want the details, this is what the cleaning method looks like:

private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
  while (!stopped) {
    try {
      val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
      // Synchronize here to avoid being interrupted on stop()
      synchronized {
        reference.foreach { ref =>
          logDebug("Got cleaning task " + ref.task)
          ref.task match {
            case CleanRDD(rddId) =>
              doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
            case CleanShuffle(shuffleId) =>
              doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
            case CleanBroadcast(broadcastId) =>
              doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
            case CleanAccum(accId) =>
              doCleanupAccum(accId, blocking = blockOnCleanupTasks)
            case CleanCheckpoint(rddId) =>
      } catch {
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)

If you want to learn more, I suggest browsing Sparks source or even better, reading @Jacek book called "Mastering Apache Spark" (This points to an explanation regarding ContextCleaner)

like image 106
Yuval Itzchakov Avatar answered Jan 25 '23 14:01

Yuval Itzchakov

In general, that's how Yuval Itzchakov wrote "just like any other object", but...(there's always "but", isn't it?)

In Spark, it's not that obvious since we have shuffle blocks (among the other blocks managed by Spark). They are managed by BlockManagers running on executors. They somehow will have to be notified when an object on the driver gets evicted from memory, right?

That's where ContextCleaner comes to stage. It's Spark Application's Garbage Collector that is responsible for application-wide cleanup of shuffles, RDDs, broadcasts, accumulators and checkpointed RDDs that is aimed at reducing the memory requirements of long-running data-heavy Spark applications.

ContextCleaner runs on the driver. It is created and immediately started when SparkContext starts (and spark.cleaner.referenceTracking Spark property is enabled, which it is by default). It is stopped when SparkContext is stopped.

You can see it working by doing the dump of all the threads in a Spark application using jconsole or jstack. ContextCleaner uses a daemon Spark Context Cleaner thread that cleans RDD, shuffle, and broadcast states.

You can also see its work by enabling INFO or DEBUG logging levels for org.apache.spark.ContextCleaner logger. Just add the following line to conf/log4j.properties:

like image 26
Jacek Laskowski Avatar answered Jan 25 '23 16:01

Jacek Laskowski