Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using when and otherwise while converting boolean values to strings in Pyspark

I have a data frame in Pyspark

df.show()


+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  true|
|  2| Ram|      Y|      0.05|   10| false|
|  3| Ian|      N|      0.01|    1| false|
|  4| Jim|      N|       1.2|    3|  true|
+---+----+-------+----------+-----+------+

The schema is below:

DataFrame[id: int, name: string, testing: string, avg_result: string, score: string, active: boolean]

I want to convert Y to True, N to False true to True and false to False.

When I do like below:

for col in cols:
    df = df.withColumn(col, f.when(f.col(col) == 'N', 'False').when(f.col(col) == 'Y', 'True').
                       when(f.col(col) == 'true', True).when(f.col(col) == 'false', False).otherwise(f.col(col)))

I get below error and there is no change in data frame

pyspark.sql.utils.AnalysisException: u"cannot resolve 'CASE WHEN (testing = N) THEN False WHEN (testing = Y) THEN True WHEN (testing = true) THEN true WHEN (testing = false) THEN false ELSE testing' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;"

+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  true|
|  2| Ram|      Y|      0.05|   10| false|
|  3| Ian|      N|      0.01|    1| false|
|  4| Jim|      N|       1.2|    3|  true|
+---+----+-------+----------+-----+------+

When I do like below

for col in cols:
    df = df.withColumn(col, f.when(f.col(col) == 'N', 'False').when(f.col(col) == 'Y', 'True').otherwise(f.col(col)))

I get below error

pyspark.sql.utils.AnalysisException: u"cannot resolve 'CASE WHEN if ((isnull(active) || isnull(cast(N as double)))) null else CASE cast(cast(N as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN False WHEN if ((isnull(active) || isnull(cast(Y as double)))) null else CASE cast(cast(Y as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN True ELSE active' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;"

But the data frame changes to

+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  true|
|  2| Ram|   True|      0.05|   10| false|
|  3| Ian|  False|      0.01|    1| false|
|  4| Jim|  False|       1.2|    3|  true|
+---+----+-------+----------+-----+------+

New attempt

for col in cols:
    df = df.withColumn(col, f.when(f.col(col) == 'N', 'False').when(f.col(col) == 'Y', 'True').
                       when(f.col(col) == 'true', 'True').when(f.col(col) == 'false', 'False').otherwise(f.col(col)))

Error received

pyspark.sql.utils.AnalysisException: u"cannot resolve 'CASE WHEN if ((isnull(active) || isnull(cast(N as double)))) null else CASE cast(cast(N as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN False WHEN if ((isnull(active) || isnull(cast(Y as double)))) null else CASE cast(cast(Y as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN True WHEN if ((isnull(active) || isnull(cast(true as double)))) null else CASE cast(cast(true as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN True WHEN if ((isnull(active) || isnull(cast(false as double)))) null else CASE cast(cast(false as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN False ELSE active' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;"

How can I get the data frame to be like

+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  True|
|  2| Ram|   True|      0.05|   10| False|
|  3| Ian|  False|      0.01|    1| False|
|  4| Jim|  False|       1.2|    3|  True|
+---+----+-------+----------+-----+------+
like image 637
User12345 Avatar asked Jul 02 '18 19:07

User12345


People also ask

How do you use when and otherwise in PySpark?

Using when() otherwise() on PySpark D ataFrame . PySpark when() is SQL function, in order to use this first you should import and this returns a Column type, otherwise() is a function of Column , when otherwise() not used and none of the conditions met it assigns None (Null) value. Usage would be like when(condition).

How do you convert string to boolean in PySpark?

In PySpark, you can cast or change the DataFrame column data type using cast() function of Column class, in this article, I will be using withColumn(), selectExpr() , and SQL expression to cast the from String to Int (Integer Type), String to Boolean e.t.c using PySpark examples.

How do you cast to string in PySpark?

In order to typecast an integer to string in pyspark we will be using cast() function with StringType() as argument, To typecast string to integer in pyspark we will be using cast() function with IntegerType() as argument.

How do you convert a column to a string in PySpark?

In order to convert array to a string, PySpark SQL provides a built-in function concat_ws() which takes delimiter of your choice as a first argument and array column (type Column) as the second argument. In order to use concat_ws() function, you need to import it using pyspark.


2 Answers

As I mentioned in the comments, the issue is a type mismatch. You need to convert the boolean column to a string before doing the comparison. Finally, you need to cast the column to a string in the otherwise() as well (you can't have mixed types in a column).

Your code is easy to modify to get the correct output:

import pyspark.sql.functions as f

cols = ["testing", "active"]
for col in cols:
    df = df.withColumn(
        col, 
        f.when(
            f.col(col) == 'N',
            'False'
        ).when(
            f.col(col) == 'Y',
            'True'
        ).when(
            f.col(col).cast('string') == 'true',
            'True'
        ).when(
            f.col(col).cast('string') == 'false',
            'False'
        ).otherwise(f.col(col).cast('string'))
    )
df.show()
#+---+----+-------+----------+-----+------+
#| id|name|testing|avg_result|score|active|
#+---+----+-------+----------+-----+------+
#|  1| sam|   null|      null| null|  True|
#|  2| Ram|   True|      0.05|   10| False|
#|  3| Ian|  False|      0.01|    1| False|
#|  4| Jim|  False|       1.2|    3|  True|
#+---+----+-------+----------+-----+------+

However, there are some alternative approaches as well. For instance, this is a good place to use pyspark.sql.Column.isin():

df = reduce(
    lambda df, col: df.withColumn(
        col, 
        f.when(
            f.col(col).cast('string').isin(['N', 'false']),
            'False'
        ).when(
            f.col(col).cast('string').isin(['Y', 'true']),
            'True'
        ).otherwise(f.col(col).cast('string'))
    ),
    cols,
    df
)
df.show()
#+---+----+-------+----------+-----+------+
#| id|name|testing|avg_result|score|active|
#+---+----+-------+----------+-----+------+
#|  1| sam|   null|      null| null|  True|
#|  2| Ram|   True|      0.05|   10| False|
#|  3| Ian|  False|      0.01|    1| False|
#|  4| Jim|  False|       1.2|    3|  True|
#+---+----+-------+----------+-----+------+

(Here I used reduce to eliminate the for loop, but you could have kept it.)

You could also use pyspark.sql.DataFrame.replace() but you'd have to first convert the column active to a string:

df = df.withColumn('active', f.col('active').cast('string'))\
    .replace(['Y', 'true',], 'True', subset=cols)\
    .replace(['N', 'false'], 'False', subset=cols)\
df.show()
# results omitted, but it's the same as above

Or using replace just once:

df = df.withColumn('active', f.col('active').cast('string'))\
    .replace(['Y', 'true', 'N', 'false'], ['True', 'True', 'False', 'False'], subset=cols)
like image 185
pault Avatar answered Oct 14 '22 13:10

pault


Looking at the schema and the transformations applied, there is a type mismatch between String and Boolean returned. E.g. 'N' is returned as 'False' (String) and 'false' is returned as False (Boolean)

You can cast the transformed columns to String to convert Y to True, N to False, true to True and false to False.

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as f

data = [
  (1, "sam", None, None, None, True),
  (2, "Ram", "Y", 0.05, 10, False),
  (3, "Ian", "N", 0.01, 1, False),
  (4, "Jim", "N", 1.2, 3, True)
  ]

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("name", StringType(), True),
  StructField("testing", StringType(), True),
  StructField("avg_result", StringType(), True),
  StructField("score", StringType(), True),
  StructField("active", BooleanType(), True)
  ])

df = sc.parallelize(data).toDF(schema)

Before applying the transformations

>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- testing: string (nullable = true)
|-- avg_result: string (nullable = true)
|-- score: string (nullable = true)
|-- active: boolean (nullable = true)

>>> df.show()
+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  true|
|  2| Ram|      Y|      0.05|   10| false|
|  3| Ian|      N|      0.01|    1| false|
|  4| Jim|      N|       1.2|    3|  true|
+---+----+-------+----------+-----+------+

Applying transformation with cast in the otherwise clause .otherwise(f.col(col).cast("string"))

cols = ["testing", "active"]

for col in cols:
    df = df.withColumn(col, 
      f.when(f.col(col) == 'N', 'False')
      .when(f.col(col) == 'Y', 'True')
      .when(f.col(col).cast("string") == 'true', 'True')
      .when(f.col(col).cast("string") == 'false', 'False'))

Results

>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- testing: string (nullable = true)
|-- avg_result: string (nullable = true)
|-- score: string (nullable = true)
|-- active: string (nullable = true)

>>> df.show()
+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
|  1| sam|   null|      null| null|  True|
|  2| Ram|   True|      0.05|   10| False|
|  3| Ian|  False|      0.01|    1| False|
|  4| Jim|  False|       1.2|    3|  True|
+---+----+-------+----------+-----+------+
like image 31
raj Avatar answered Oct 14 '22 12:10

raj