Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

if else in pyspark for collapsing column values

I am trying a simple code to collapse my categorical variables in dataframe to binary classes after indexing currently my column has 3 classes- "A","B","C" I am writing a simple if else statement to collapse classes like

def condition(r):
if (r.wo_flag=="SLM" or r.wo_flag=="NON-SLM"):
    r.wo_flag="dispatch" 
else: 
    r.wo_flag="non_dispatch" 
return r.wo_flag 

df_final=df_new.map(lambda x: condition(x)) 

Its not working it doesn't understand the else condition

|MData|Recode12|Status|DayOfWeekOfDispatch|MannerOfDispatch|Wo_flag|PlaceOfInjury|Race|
     M|      11|     M|                  4|               7|      C|           99| 1  |    
     M|       8|     D|                  3|               7|      A|           99| 1  |
     F|      10|     W|                  2|               7|      C|           99| 1  |
     M|       9|     D|                  1|               7|      B|           99| 1  |
     M|       8|     D|                  2|               7|      C|           99| 1  |

This is the Sample Data

like image 754
Shweta Kamble Avatar asked May 04 '16 20:05

Shweta Kamble


People also ask

How do you drop rows in Pyspark based on condition?

Drop rows with NA or missing values in pyspark is accomplished by using na. drop() function. NA or Missing values in pyspark is dropped using na. drop() function.

How do you write if else in spark?

you can use this: if(exp1, exp2, exp3) inside spark. sql() where exp1 is condition and if true give me exp2, else give me exp3.


2 Answers

The accepted answer is not very efficient due to the use of a user defined function (UDF).

I think most people are looking for when.

from pyspark.sql.functions import when

matches = df["wo_flag"].isin("SLM", "NON-SLM")
new_df = df.withColumn("wo_flag", when(matches, "dispatch").otherwise("non-dispatch"))
like image 93
mcskinner Avatar answered Oct 16 '22 16:10

mcskinner


Try this :

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def modify_values(r):
    if r == "A" or r =="B":
        return "dispatch"
    else:
        return "non-dispatch"
ol_val = udf(modify_values, StringType())
new_df = df.withColumn("wo_flag",ol_val(df.wo_flag))

Things you are doing wrong:

  • You are trying to modify Rows (Rows are immmutable)
  • When a map operation is done on a dataframe , the resulting data structure is a PipelinedRDD and not a dataframe . You have to apply .toDF() to get dataframe
like image 44
Himaprasoon Avatar answered Oct 16 '22 15:10

Himaprasoon