Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Where is my sparkDF.persist(DISK_ONLY) data stored?

I want to understand more about the persisting strategy of hadoop out of spark.

When I persist a dataframe with the DISK_ONLY-strategy where is my data stored (path/folder...)? And where do I specify this location?

like image 690
maffe Avatar asked Jan 24 '18 19:01

maffe


People also ask

Where does Spark store persist data?

To persist an RDD, we use persist ( ) method. We can use apache spark through scala, python, java etc coding. Persist( ) method will always store the data in JVM. In java virtual machine as an unserialized object, while working with java and scala.

How does persist work in Spark?

When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset. And Spark's persisted data on nodes are fault-tolerant meaning if any partition of a Dataset is lost, it will automatically be recomputed using the original transformations that created it.

Where is RDD data stored?

The RDDs store data in memory for fast access to data during computation and provide fault tolerance [110]. An RDD is an immutable distributed collection of key–value pairs of data, stored across nodes in the cluster.

Does Spark has its own storage?

Spark is an open source framework focused on interactive query, machine learning, and real-time workloads. It does not have its own storage system, but runs analytics on other storage systems like HDFS, or other popular stores like Amazon Redshift, Amazon S3, Couchbase, Cassandra, and others.


2 Answers

For the short answer we can just have a look at the documentation regarding spark.local.dir:

Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. NOTE: In Spark 1.0 and later this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) environment variables set by the cluster manager.

For a more in-depth understanding we can look at the code: a DataFrame (which is just a Dataset[Row]) is based on RDDs and it leverages the same persistence mechanisms. RDDs delegate this to SparkContext, which marks it for persistence. The task is then actually taken care of by several classes in the org.apache.spark.storage package: first, the BlockManager just manages chunks of data to be persisted and the policy on how to do it, delegating actual persistence to a DiskStore (when writing on disk, of course) which represents a high level interface for writing and that in turn has a DiskBlockManager for more low-level operations.

Hope you have an understanding of where to look now, so that we can move on and understand where the data is actually persisted and how we can even configure it: the DiskBlockManager invokes the helper Utils.getConfiguredLocalDirs, which for practicality I'm going to copy here (taken from the linked 2.2.1 version, the latest release at the time of writing):

def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
    val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
    if (isRunningInYarnContainer(conf)) {
        // If we are in yarn mode, systems can have different disk layouts so we must set it
        // to what Yarn on this system said was available. Note this assumes that Yarn has
        // created the directories already, and that they are secured so that only the
        // user has access to them.
        getYarnLocalDirs(conf).split(",")
    } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
        conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
    } else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
        conf.getenv("SPARK_LOCAL_DIRS").split(",")
    } else if (conf.getenv("MESOS_DIRECTORY") != null && !shuffleServiceEnabled) {
        // Mesos already creates a directory per Mesos task. Spark should use that directory
        // instead so all temporary files are automatically cleaned up when the Mesos task ends.
        // Note that we don't want this if the shuffle service is enabled because we want to
        // continue to serve shuffle files after the executors that wrote them have already exited.
        Array(conf.getenv("MESOS_DIRECTORY"))
    } else {
        if (conf.getenv("MESOS_DIRECTORY") != null && shuffleServiceEnabled) {
        logInfo("MESOS_DIRECTORY available but not using provided Mesos sandbox because " +
            "spark.shuffle.service.enabled is enabled.")
        }
        // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
        // configuration to point to a secure directory. So create a subdirectory with restricted
        // permissions under each listed directory.
        conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
    }
}

The code, I believe, is pretty self-explanatory and well commented (and perfectly matches the contents of the documentation): when running on Yarn there is a specific policy that relies on the storage of Yarn containers, in Mesos it either uses the Mesos sandbox (unless the shuffle service is enabled) and in all other cases it will go to the location set under spark.local.dir or alternatively java.io.tmpdir (which is likely to be /tmp/).

So, if you are just playing around the data is most likely store under /tmp/, otherwise it depends a lot on your environment and configuration.

like image 174
stefanobaghino Avatar answered Sep 19 '22 19:09

stefanobaghino


To sum it up for my YARN environment:

With the guide of @stefanobaghino i was able to just go one step further in the code where the yarn config is loaded.

val localDirs = Option(conf.getenv("LOCAL_DIRS")).getOrElse("")

which is set in the yarn.nodemanager.local-dirs option in yarn-default.xml

The background for my question is, that caused by the error

2018-01-23 16:57:35,229 WARN org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection: Directory /data/1/yarn/local error, used space above threshold of 98.5%, removing from list of valid directories

my spark-job got killed sometimes and I'd like to understand whether this disk is also used for my persisted data while running the job (which is actually a massive amount).

So it turns out that this is exactly the folder where the data goes to when persisting it with a DISK-strategy.

Thanks a lot for all your helpful guidance in this problem!

like image 27
maffe Avatar answered Sep 21 '22 19:09

maffe