Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark RDD's - how do they work

I have a small Scala program that runs fine on a single-node. However, I am scaling it out so it runs on multiple nodes. This is my first such attempt. I am just trying to understand how the RDDs work in Spark so this question is based around theory and may not be 100% correct.

Let's say I create an RDD: val rdd = sc.textFile(file)

Now once I've done that, does that mean that the file at file is now partitioned across the nodes (assuming all nodes have access to the file path)?

Secondly, I want to count the number of objects in the RDD (simple enough), however, I need to use that number in a calculation which needs to be applied to objects in the RDD - a pseudocode example:

rdd.map(x => x / rdd.size)

Let's say there are 100 objects in rdd, and say there are 10 nodes, thus a count of 10 objects per node (assuming this is how the RDD concept works), now when I call the method is each node going to perform the calculation with rdd.size as 10 or 100? Because, overall, the RDD is size 100 but locally on each node it is only 10. Am I required to make a broadcast variable prior to doing the calculation? This question is linked to the question below.

Finally, if I make a transformation to the RDD, e.g. rdd.map(_.split("-")), and then I wanted the new size of the RDD, do I need to perform an action on the RDD, such as count(), so all the information is sent back to the driver node?

like image 626
monster Avatar asked Dec 12 '14 19:12

monster


People also ask

How does Spark collect work?

Spark collect() and collectAsList() are action operation that is used to retrieve all the elements of the RDD/DataFrame/Dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group(), count() e.t.c. Retrieving on larger dataset results in out of memory.

How does Spark reconstruct the lost partitions in memory?

Lineage is an RDD process to reconstruct lost partitions. Spark not replicate the data in memory, if data lost, Rdd use linege to rebuild lost data. Each RDD remembers how the RDD build from other datasets.

How does Spark RDD work?

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.

How does Spark engine work?

Spark runs multi-threaded tasks inside of JVM processes, whereas MapReduce runs as heavier weight JVM processes. This gives Spark faster startup, better parallelism, and better CPU utilization. Spark provides a richer functional programming model than MapReduce.


2 Answers

val rdd = sc.textFile(file)

Does that mean that the file is now partitioned across the nodes?

The file remains wherever it was. The elements of the resulting RDD[String] are the lines of the file. The RDD is partitioned to match the natural partitioning of the underlying file system. The number of partitions does not depend on the number of nodes you have.

It is important to understand that when this line is executed it does not read the file(s). The RDD is a lazy object and will only do something when it must. This is great because it avoids unnecessary memory usage.

For example, if you write val errors = rdd.filter(line => line.startsWith("error")), still nothing happens. If you then write val errorCount = errors.count now your sequence of operations will need to be executed because the result of count is an integer. What each worker core (executor thread) will do in parallel then, is read a file (or piece of file), iterate through its lines, and count the lines starting with "error". Buffering and GC aside, only a single line per core will be in memory at a time. This makes it possible to work with very large data without using a lot of memory.

I want to count the number of objects in the RDD, however, I need to use that number in a calculation which needs to be applied to objects in the RDD - a pseudocode example:

rdd.map(x => x / rdd.size)

There is no rdd.size method. There is rdd.count, which counts the number of elements in the RDD. rdd.map(x => x / rdd.count) will not work. The code will try to send the rdd variable to all workers and will fail with a NotSerializableException. What you can do is:

val count = rdd.count
val normalized = rdd.map(x => x / count)

This works, because count is an Int and can be serialized.

If I make a transformation to the RDD, e.g. rdd.map(_.split("-")), and then I wanted the new size of the RDD, do I need to perform an action on the RDD, such as count(), so all the information is sent back to the driver node?

map does not change the number of elements. I don't know what you mean by "size". But yes, you need to perform an action, such as count to get anything out of the RDD. You see, no work at all is performed until you perform an action. (When you perform count, only the per-partition count will be sent back to the driver, of course, not "all the information".)

like image 81
Daniel Darabos Avatar answered Sep 17 '22 18:09

Daniel Darabos


Usually, the file (or parts of the file, if it's too big) is replicated to N nodes in the cluster (by default N=3 on HDFS). It's not an intention to split every file between all available nodes.

However, for you (i.e. the client) working with file using Spark should be transparent - you should not see any difference in rdd.size, no matter on how many nodes it's split and/or replicated. There are methods (at least, in Hadoop) to find out on which nodes (parts of the) file can be located at the moment. However, in simple cases you most probably won't need to use this functionality.

UPDATE: an article describing RDD internals: https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf

like image 35
Ashalynd Avatar answered Sep 21 '22 18:09

Ashalynd