Let's say I have a rather large dataset in the following form:
data = sc.parallelize([('Foo',41,'US',3), ('Foo',39,'UK',1), ('Bar',57,'CA',2), ('Bar',72,'CA',2), ('Baz',22,'US',6), ('Baz',36,'US',6)])
What I would like to do is remove duplicate rows based on the values of the first,third and fourth columns only.
Removing entirely duplicate rows is straightforward:
data = data.distinct()
and either row 5 or row 6 will be removed
But how do I only remove duplicate rows based on columns 1, 3 and 4 only? i.e. remove either one one of these:
('Baz',22,'US',6) ('Baz',36,'US',6)
In Python, this could be done by specifying columns with .drop_duplicates()
. How can I achieve the same in Spark/Pyspark?
Use dropDuplicate() – Remove Duplicate Rows on DataFrame Spark doesn't have a distinct method that takes columns that should run distinct on however, Spark provides another signature of dropDuplicates() function which takes multiple columns to eliminate duplicates.
If you want to remove all duplicates from a particular column or set of columns, i.e doing a distinct on set of columns, then pyspark has the function dropDuplicates , which will accept specific set of columns to distinct on.
PySpark distinct() function is used to drop/remove the duplicate rows (all columns) from DataFrame and dropDuplicates() is used to drop rows based on selected (one or multiple) columns.
Pyspark does include a dropDuplicates()
method, which was introduced in 1.4. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>> from pyspark.sql import Row >>> df = sc.parallelize([ \ ... Row(name='Alice', age=5, height=80), \ ... Row(name='Alice', age=5, height=80), \ ... Row(name='Alice', age=10, height=80)]).toDF() >>> df.dropDuplicates().show() +---+------+-----+ |age|height| name| +---+------+-----+ | 5| 80|Alice| | 10| 80|Alice| +---+------+-----+ >>> df.dropDuplicates(['name', 'height']).show() +---+------+-----+ |age|height| name| +---+------+-----+ | 5| 80|Alice| +---+------+-----+
From your question, it is unclear as-to which columns you want to use to determine duplicates. The general idea behind the solution is to create a key based on the values of the columns that identify duplicates. Then, you can use the reduceByKey or reduce operations to eliminate duplicates.
Here is some code to get you started:
def get_key(x): return "{0}{1}{2}".format(x[0],x[2],x[3]) m = data.map(lambda x: (get_key(x),x))
Now, you have a key-value RDD
that is keyed by columns 1,3 and 4. The next step would be either a reduceByKey
or groupByKey
and filter
. This would eliminate duplicates.
r = m.reduceByKey(lambda x,y: (x))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With