Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark Drop Rows

People also ask

How do you filter blank rows in PySpark?

In PySpark, using filter() or where() functions of DataFrame we can filter rows with NULL values by checking isNULL() of PySpark Column class. The above statements return all rows that have null values on the state column and the result is returned as the new DataFrame.


Specific to PySpark:

As per @maasg, you could do this:

header = rdd.first()
rdd.filter(lambda line: line != header)

but it's not technically correct, as it's possible you exclude lines containing data as well as the header. However, this seems to work for me:

def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr
rdd.mapPartitionsWithIndex(remove_header)

Similarly:

rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])

I'm new to Spark, so can't intelligently comment about which will be fastest.


AFAIK there's no 'easy' way to do this.

This should do the trick, though:

val header = data.first
val rows = data.filter(line => line != header)

A straightforward way to achieve this in PySpark (Python API), assuming you are using Python 3:

noHeaderRDD = rawRDD.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys()

I did some profiling with various solutions and have the following

Cluster Configuration

Clusters

  • Cluster 1 : 4 Cores 16 GB
  • Cluster 2 : 4 Cores 16 GB
  • Cluster 3 : 4 Cores 16 GB
  • Cluster 4 : 2 Cores 8 GB

Data

7 million rows, 4 columns

#Solution 1
# Time Taken : 40 ms
data=sc.TextFile('file1.txt')
firstRow=data.first()
data=data.filter(lambda row:row != firstRow)

#Solution 2
# Time Taken : 3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
     return iter(list(iterator)[1:]) if index==0 else iterator
data=data.mapPartitionsWithIndex(dropFirstRow)

#Solution 3
# Time Taken : 0.3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
     if(index==0):
          for subIndex,item in enumerate(iterator):
               if subIndex > 0:
                    yield item
     else:
          yield iterator

data=data.mapPartitionsWithIndex(dropFirstRow)

I think that Solution 3 is the most scalable