Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Mind blown: RDD.zip() method

Tags:

apache-spark

I just discovered the RDD.zip() method and I cannot imagine what its contract could possibly be.

I understand what it does, of course. However, it has always been my understanding that

  • the order of elements in an RDD is a meaningless concept
  • the number of partitions and their sizes is an implementation detail only available to the user for performance tuning

In other words, an RDD is a (multi)set, not a sequence (and, of course, in, e.g., Python one gets AttributeError: 'set' object has no attribute 'zip')

What is wrong with my understanding above?

What was the rationale behind this method?

Is it legal outside the trivial context like a.map(f).zip(a)?

EDIT 1:

  • Another crazy method is zipWithIndex(), as well as well as the various zipPartitions() variants.
  • Note that first() and take() are not crazy because they are just (non-random) samples of the RDD.
  • collect() is also okay - it just converts a set to a sequence which is perfectly legit.

EDIT 2: The reply says:

when you compute one RDD from another the order of elements in the new RDD may not correspond to that in the old one.

This appears to imply that even the trivial a.map(f).zip(a) is not guaranteed to be equivalent to a.map(x => (f(x),x)). What is the situation when zip() results are reproducible?

like image 748
sds Avatar asked Mar 25 '15 23:03

sds


People also ask

What does zip do in Spark?

Zips this RDD with another one, returning key-value pairs with the first element in each RDD second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

What are the 4 ways provided to construct an RDD?

There are three ways to create an RDD in Spark. Parallelizing already existing collection in driver program. Referencing a dataset in an external storage system (e.g. HDFS, Hbase, shared file system). Creating RDD from already existing RDDs.

What is RDD in MapReduce?

Overview of RDD in Apache Spark The primary abstraction the Spark is the concept of RDD, which Spark uses to achieve Faster and efficient MapReduce operations. Resilient Distributed Dataset (RDD) is the fundamental data structure of Spark. They are immutable Distributed collections of objects of any type.

What are the types of RDD?

There are Three types of operations on RDDs: Transformations, Actions and Shuffles. The most expensive operations are those the require communication between nodes. Transformations: RDD RDD.


2 Answers

It is not true that RDDs are always unordered. An RDD has a guaranteed order if it is the result of a sortBy operation, for example. An RDD is not a set; it can contain duplicates. Partitioning is not opaque to the caller, and can be controlled and queried. Many operations do preserve both partitioning and order, like map. That said I find it a little easy to accidentally violate the assumptions that zip depends on, since they're a little subtle, but it certainly has a purpose.

like image 199
Sean Owen Avatar answered Nov 10 '22 08:11

Sean Owen


The mental model I use (and recommend) is that the elements of an RDD are ordered, but when you compute one RDD from another the order of elements in the new RDD may not correspond to that in the old one.

For those who want to be aware of partitions, I'd say that:

  1. The partitions of an RDD have an order.
  2. The elements within a partition have an order.
  3. If you think of "concatenating" the partitions (say laying them "end to end" in order) using the order of elements within them, the overall ordering you end up with corresponds to the order of elements if you ignore partitions.

But again, if you compute one RDD from another, all bets about the order relationships of the two RDDs are off.

Several members of the RDD class (I'm referring to the Scala API) strongly suggest an order concept (as does their documentation):

collect()
first()
partitions
take()
zipWithIndex()

as does Partition.index as well as SparkContext.parallelize() and SparkContext.makeRDD() (which both take a Seq[T]).

In my experience these ways of "observing" order give results that are consistent with each other, and the ones that translate back and forth between RDDs and ordered Scala collections behave as you would expect -- they preserve the overall order of elements. This is why I say that, in practice, RDDs have a meaningful order concept.

Furthermore, while there are obviously many situations where computing an RDD from another must change the order, in my experience order tends to be preserved where it is possible/reasonable to do so. Operations that don't re-partition and don't fundamentally change the set of elements especially tend to preserve order.

But this brings me to your question about "contract", and indeed the documentation has a problem in this regard. I have not seen a single place where an operation's effect on element order is made clear. (The OrderedRDDFunctions class doesn't count, because it refers to an ordering based on the data, which may differ from the raw order of elements within the RDD. Likewise the RangePartitioner class.) I can see how this might lead you to conclude that there is no concept of element order, but the examples I've given above make that model unsatisfying to me.

like image 35
Spiro Michaylov Avatar answered Nov 10 '22 08:11

Spiro Michaylov