Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I return an empty (null?) item back from a map method in PySpark?

I am writing a map method using

RDD.map(lambda line: my_method(line))

and based on a particular condition in my_method (let's say line starts with 'a'), I want to either return a particular value otherwise ignore this item all together.

For now, I am returning -1 if the condition is not met on the item and later using another

RDD.filter() method to remove all the ones with -1.

Any better way to be able to ignore these items by returning null from my_method?

like image 517
London guy Avatar asked Dec 15 '15 16:12

London guy


People also ask

How do I get null values in PySpark?

In PySpark DataFrame you can calculate the count of Null, None, NaN or Empty/Blank values in a column by using isNull() of Column class & SQL functions isnan() count() and when().

How do you handle null in PySpark?

In PySpark, using filter() or where() functions of DataFrame we can filter rows with NULL values by checking isNULL() of PySpark Column class. The above statements return all rows that have null values on the state column and the result is returned as the new DataFrame. All the above examples return the same output.

How do you remove null values from a column in PySpark?

In order to remove Rows with NULL values on selected columns of PySpark DataFrame, use drop(columns:Seq[String]) or drop(columns:Array[String]). To these functions pass the names of the columns you wanted to check for NULL values to delete rows.

How do you filter null values in PySpark DataFrame?

For filtering the NULL/None values we have the function in PySpark API know as a filter() and with this function, we are using isNotNull() function. Syntax: df. filter(condition) : This function returns the new dataframe with the values which satisfies the given condition.


1 Answers

In case like this flatMap is your friend:

  1. Adjust my_method so it returns either a single element list or an empty list (or create a wrapper like here What is the equivalent to scala.util.Try in pyspark?)

    def my_method(line):
        return [line.lower()] if line.startswith("a") else []
    
  2. flatMap

    rdd = sc.parallelize(["aDSd", "CDd", "aCVED"])
    
    rdd.flatMap(lambda line: my_method(line)).collect()
    ## ['adsd', 'acved']
    
like image 183
zero323 Avatar answered Sep 25 '22 22:09

zero323