Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Apache Spark, why does RDD.union not preserve the partitioner?

As everyone knows partitioners in Spark have a huge performance impact on any "wide" operations, so it's usually customized in operations. I was experimenting with the following code:

val rdd1 =   sc.parallelize(1 to 50).keyBy(_ % 10)     .partitionBy(new HashPartitioner(10)) val rdd2 =   sc.parallelize(200 to 230).keyBy(_ % 13)  val cogrouped = rdd1.cogroup(rdd2) println("cogrouped: " + cogrouped.partitioner)  val unioned = rdd1.union(rdd2) println("union: " + unioned.partitioner) 

I see that by default cogroup() always yields an RDD with the customized partitioner, but union() doesn't, it will always revert back to default. This is counterintuitive as we usually assume that a PairRDD should use its first element as partition key. Is there a way to "force" Spark to merge 2 PairRDDs to use the same partition key?

like image 531
tribbloid Avatar asked Apr 30 '15 20:04

tribbloid


People also ask

Does RDD preserve order?

textFile) the lines of the RDD will be in the order that they were in the file. map, filter, flatMap, and coalesce (with shuffle=false) do preserve the order like most of the RDD operations they work on Iterators inside the partitions. So, they just don't have any choice of messing up the order.

Does Union cause shuffle?

repartition, join, cogroup, and any of the *By or *ByKey transformations can result in shuffles. 2. map, filter and union generate a only stage (no shuffling).

How does union work in Spark?

The Union is a transformation in Spark that is used to work with multiple data frames in Spark. It takes the data frame as the input and the return type is a new data frame containing the elements that are in data frame1 as well as in data frame2.

Why RDDs are fault tolerant and immutable?

The RDDs are fault-tolerant as they can track data lineage information to allow for rebuilding lost data automatically on failure. To achieve fault tolerance for the generated RDD's, the achieved data is replicated among various Spark executors in worker nodes in the cluster.

What is RDD persistence in Apache Spark?

Spark RDD persistence is an optimization technique in which saves the result of RDD evaluation. Using this we save the intermediate result so that we can use it further if required. It reduces the computation overhead. We can make persisted RDD through cache() and persist() methods. When we use the cache() method we can store all the RDD in-memory.

How to recover lost RDD in spark?

When the RDD is computed for the first time, it is kept in memory on the node. The cache memory of the Spark is fault tolerant so whenever any partition of RDD is lost, it can be recovered by transformation Operation that originally created it. 3. Need of Persistence in Apache Spark In Spark, we can use some RDD’s multiple times.

Why do we need RDDs in spark?

Due to these reasons, a lot of organizations have migrated their big data applications to Spark and the first thing they learn is how to use RDDs. This makes sense, as RDD is the building block of Spark and the whole idea of Spark is based on RDD. Also, it is the perfect replacement for MapReduce.

What is the basic data structure of Apache Spark?

RDD is the fundamental data structure of Apache Spark. RDD is Read-only partition collection of records. It can only be created through deterministic operation on either: Data in stable storage, other RDDs, and parallelizing already existing collection in driver program (Follow this guide to learn the ways to create RDD in Spark ).


1 Answers

union is a very efficient operation, because it doesn't move any data around. If rdd1 has 10 partitions and rdd2 has 20 partitions then rdd1.union(rdd2) will have 30 partitions: the partitions of the two RDDs put after each other. This is just a bookkeeping change, there is no shuffle.

But necessarily it discards the partitioner. A partitioner is constructed for a given number of partitions. The resulting RDD has a number of partitions that is different from both rdd1 and rdd2.

After taking the union you can run repartition to shuffle the data and organize it by key.


There is one exception to the above. If rdd1 and rdd2 have the same partitioner (with the same number of partitions), union behaves differently. It will join the partitions of the two RDDs pairwise, giving it the same number of partitions as each of the inputs had. This may involve moving data around (if the partitions were not co-located) but will not involve a shuffle. In this case the partitioner is retained. (The code for this is in PartitionerAwareUnionRDD.scala.)

like image 87
Daniel Darabos Avatar answered Sep 25 '22 05:09

Daniel Darabos