Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Distinct() function work in Spark?

I'm a newbie to Apache Spark and was learning basic functionalities. Had a small doubt.Suppose I have an RDD of tuples (key, value) and wanted to obtain some unique ones out of them. I use distinct() function. I'm wondering on what basis does the function consider that tuples as disparate..? Is it based on the keys, or values, or both?

like image 955
preetham madeti Avatar asked Jun 20 '15 23:06

preetham madeti


People also ask

What does distinct do in Spark?

distinct()Returns a new DataFrame containing the distinct rows in this DataFrame . Now if you need to consider only a subset of the columns when dropping duplicates, then you first have to make a column selection before calling distinct() as shown below.

How do I select distinct values in Spark?

Spark doesn't have a distinct method that takes columns that should run distinct on however, Spark provides another signature of dropDuplicates() function which takes multiple columns to eliminate duplicates. Note that calling dropDuplicates() on DataFrame returns a new DataFrame with duplicate rows removed.

How does PySpark distinct work?

PySpark distinct() function is used to drop/remove the duplicate rows (all columns) from DataFrame and dropDuplicates() is used to drop rows based on selected (one or multiple) columns. In this article, you will learn how to use distinct() and dropDuplicates() functions with PySpark example.

How do I select distinct rows in Spark?

We are going to create a dataframe from pyspark list bypassing the list to the createDataFrame() method from pyspark, then by using distinct() function we will get the distinct rows from the dataframe.


2 Answers

.distinct() is definitely doing a shuffle across partitions. To see more of what's happening, run a .toDebugString on your RDD.

val hashPart = new HashPartitioner(<number of partitions>)  val myRDDPreStep = <load some RDD>  val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER) myRDD.checkpoint println(myRDD.toDebugString) 

which for an RDD example I have (myRDDPreStep is already hash-partitioned by key, persisted by StorageLevel.MEMORY_AND_DISK_SER, and checkpointed), returns:

(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated] +-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]     |    ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]     +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]         |    myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]         |        CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B         |    myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated] 

Note that there may be more efficient ways to get distinct that involve fewer shuffles, ESPECIALLY if your RDD is already partitioned in a smart way and the partitions are not overly skewed.

See Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct? and Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?

like image 177
Glenn Strycker Avatar answered Sep 25 '22 09:09

Glenn Strycker


The API docs for RDD.distinct() only provide a one sentence description:

"Return a new RDD containing the distinct elements in this RDD."

From recent experience I can tell you that in a tuple-RDD the tuple as a whole is considered.

If you want distinct keys or distinct values, then depending on exactly what you want to accomplish, you can either:

A. call groupByKey() to transform {(k1,v11),(k1,v12),(k2,v21),(k2,v22)} to {(k1,[v11,v12]), (k2,[v21,v22])} ; or

B. strip out either the keys or values by calling keys() or values() followed by distinct()

As of this writing (June 2015) UC Berkeley + EdX is running a free online course Introduction to Big Data and Apache Spark which would provide hands on practice with these functions.

like image 41
Paul Avatar answered Sep 24 '22 09:09

Paul