Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

Let's say I have a rather large dataset in the following form:

data = sc.parallelize([('Foo',41,'US',3),                        ('Foo',39,'UK',1),                        ('Bar',57,'CA',2),                        ('Bar',72,'CA',2),                        ('Baz',22,'US',6),                        ('Baz',36,'US',6)]) 

What I would like to do is remove duplicate rows based on the values of the first,third and fourth columns only.

Removing entirely duplicate rows is straightforward:

data = data.distinct() 

and either row 5 or row 6 will be removed

But how do I only remove duplicate rows based on columns 1, 3 and 4 only? i.e. remove either one one of these:

('Baz',22,'US',6) ('Baz',36,'US',6) 

In Python, this could be done by specifying columns with .drop_duplicates(). How can I achieve the same in Spark/Pyspark?

like image 541
Jason Avatar asked May 14 '15 22:05

Jason


People also ask

How do I remove duplicates in spark RDD?

Use dropDuplicate() – Remove Duplicate Rows on DataFrame Spark doesn't have a distinct method that takes columns that should run distinct on however, Spark provides another signature of dropDuplicates() function which takes multiple columns to eliminate duplicates.

How do I remove duplicates in RDD PySpark?

If you want to remove all duplicates from a particular column or set of columns, i.e doing a distinct on set of columns, then pyspark has the function dropDuplicates , which will accept specific set of columns to distinct on.

How do I remove duplicates based on two columns in PySpark?

PySpark distinct() function is used to drop/remove the duplicate rows (all columns) from DataFrame and dropDuplicates() is used to drop rows based on selected (one or multiple) columns.


2 Answers

Pyspark does include a dropDuplicates() method, which was introduced in 1.4. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates

>>> from pyspark.sql import Row >>> df = sc.parallelize([ \ ...     Row(name='Alice', age=5, height=80), \ ...     Row(name='Alice', age=5, height=80), \ ...     Row(name='Alice', age=10, height=80)]).toDF() >>> df.dropDuplicates().show() +---+------+-----+ |age|height| name| +---+------+-----+ |  5|    80|Alice| | 10|    80|Alice| +---+------+-----+  >>> df.dropDuplicates(['name', 'height']).show() +---+------+-----+ |age|height| name| +---+------+-----+ |  5|    80|Alice| +---+------+-----+ 
like image 120
vaer-k Avatar answered Sep 23 '22 01:09

vaer-k


From your question, it is unclear as-to which columns you want to use to determine duplicates. The general idea behind the solution is to create a key based on the values of the columns that identify duplicates. Then, you can use the reduceByKey or reduce operations to eliminate duplicates.

Here is some code to get you started:

def get_key(x):     return "{0}{1}{2}".format(x[0],x[2],x[3])  m = data.map(lambda x: (get_key(x),x)) 

Now, you have a key-value RDD that is keyed by columns 1,3 and 4. The next step would be either a reduceByKey or groupByKey and filter. This would eliminate duplicates.

r = m.reduceByKey(lambda x,y: (x)) 
like image 40
Mike Avatar answered Sep 23 '22 01:09

Mike