I have a data frame in Pyspark
df.show()
+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
| 1| sam| null| null| null| true|
| 2| Ram| Y| 0.05| 10| false|
| 3| Ian| N| 0.01| 1| false|
| 4| Jim| N| 1.2| 3| true|
+---+----+-------+----------+-----+------+
The schema is below:
DataFrame[id: int, name: string, testing: string, avg_result: string, score: string, active: boolean]
I want to convert Y
to True
, N
to False
true
to True
and false
to False
.
When I do like below:
for col in cols:
df = df.withColumn(col, f.when(f.col(col) == 'N', 'False').when(f.col(col) == 'Y', 'True').
when(f.col(col) == 'true', True).when(f.col(col) == 'false', False).otherwise(f.col(col)))
I get below error and there is no change in data frame
pyspark.sql.utils.AnalysisException: u"cannot resolve 'CASE WHEN (testing = N) THEN False WHEN (testing = Y) THEN True WHEN (testing = true) THEN true WHEN (testing = false) THEN false ELSE testing' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;"
+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
| 1| sam| null| null| null| true|
| 2| Ram| Y| 0.05| 10| false|
| 3| Ian| N| 0.01| 1| false|
| 4| Jim| N| 1.2| 3| true|
+---+----+-------+----------+-----+------+
When I do like below
for col in cols:
df = df.withColumn(col, f.when(f.col(col) == 'N', 'False').when(f.col(col) == 'Y', 'True').otherwise(f.col(col)))
I get below error
pyspark.sql.utils.AnalysisException: u"cannot resolve 'CASE WHEN if ((isnull(active) || isnull(cast(N as double)))) null else CASE cast(cast(N as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN False WHEN if ((isnull(active) || isnull(cast(Y as double)))) null else CASE cast(cast(Y as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN True ELSE active' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;"
But the data frame changes to
+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
| 1| sam| null| null| null| true|
| 2| Ram| True| 0.05| 10| false|
| 3| Ian| False| 0.01| 1| false|
| 4| Jim| False| 1.2| 3| true|
+---+----+-------+----------+-----+------+
New attempt
for col in cols:
df = df.withColumn(col, f.when(f.col(col) == 'N', 'False').when(f.col(col) == 'Y', 'True').
when(f.col(col) == 'true', 'True').when(f.col(col) == 'false', 'False').otherwise(f.col(col)))
Error received
pyspark.sql.utils.AnalysisException: u"cannot resolve 'CASE WHEN if ((isnull(active) || isnull(cast(N as double)))) null else CASE cast(cast(N as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN False WHEN if ((isnull(active) || isnull(cast(Y as double)))) null else CASE cast(cast(Y as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN True WHEN if ((isnull(active) || isnull(cast(true as double)))) null else CASE cast(cast(true as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN True WHEN if ((isnull(active) || isnull(cast(false as double)))) null else CASE cast(cast(false as double) as double) WHEN cast(1 as double) THEN active WHEN cast(0 as double) THEN NOT active ELSE false THEN False ELSE active' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;"
How can I get the data frame to be like
+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
| 1| sam| null| null| null| True|
| 2| Ram| True| 0.05| 10| False|
| 3| Ian| False| 0.01| 1| False|
| 4| Jim| False| 1.2| 3| True|
+---+----+-------+----------+-----+------+
Using when() otherwise() on PySpark D ataFrame . PySpark when() is SQL function, in order to use this first you should import and this returns a Column type, otherwise() is a function of Column , when otherwise() not used and none of the conditions met it assigns None (Null) value. Usage would be like when(condition).
In PySpark, you can cast or change the DataFrame column data type using cast() function of Column class, in this article, I will be using withColumn(), selectExpr() , and SQL expression to cast the from String to Int (Integer Type), String to Boolean e.t.c using PySpark examples.
In order to typecast an integer to string in pyspark we will be using cast() function with StringType() as argument, To typecast string to integer in pyspark we will be using cast() function with IntegerType() as argument.
In order to convert array to a string, PySpark SQL provides a built-in function concat_ws() which takes delimiter of your choice as a first argument and array column (type Column) as the second argument. In order to use concat_ws() function, you need to import it using pyspark.
As I mentioned in the comments, the issue is a type mismatch. You need to convert the boolean column to a string before doing the comparison. Finally, you need to cast the column to a string in the otherwise()
as well (you can't have mixed types in a column).
Your code is easy to modify to get the correct output:
import pyspark.sql.functions as f
cols = ["testing", "active"]
for col in cols:
df = df.withColumn(
col,
f.when(
f.col(col) == 'N',
'False'
).when(
f.col(col) == 'Y',
'True'
).when(
f.col(col).cast('string') == 'true',
'True'
).when(
f.col(col).cast('string') == 'false',
'False'
).otherwise(f.col(col).cast('string'))
)
df.show()
#+---+----+-------+----------+-----+------+
#| id|name|testing|avg_result|score|active|
#+---+----+-------+----------+-----+------+
#| 1| sam| null| null| null| True|
#| 2| Ram| True| 0.05| 10| False|
#| 3| Ian| False| 0.01| 1| False|
#| 4| Jim| False| 1.2| 3| True|
#+---+----+-------+----------+-----+------+
However, there are some alternative approaches as well. For instance, this is a good place to use pyspark.sql.Column.isin()
:
df = reduce(
lambda df, col: df.withColumn(
col,
f.when(
f.col(col).cast('string').isin(['N', 'false']),
'False'
).when(
f.col(col).cast('string').isin(['Y', 'true']),
'True'
).otherwise(f.col(col).cast('string'))
),
cols,
df
)
df.show()
#+---+----+-------+----------+-----+------+
#| id|name|testing|avg_result|score|active|
#+---+----+-------+----------+-----+------+
#| 1| sam| null| null| null| True|
#| 2| Ram| True| 0.05| 10| False|
#| 3| Ian| False| 0.01| 1| False|
#| 4| Jim| False| 1.2| 3| True|
#+---+----+-------+----------+-----+------+
(Here I used reduce
to eliminate the for
loop, but you could have kept it.)
You could also use pyspark.sql.DataFrame.replace()
but you'd have to first convert the column active to a string:
df = df.withColumn('active', f.col('active').cast('string'))\
.replace(['Y', 'true',], 'True', subset=cols)\
.replace(['N', 'false'], 'False', subset=cols)\
df.show()
# results omitted, but it's the same as above
Or using replace
just once:
df = df.withColumn('active', f.col('active').cast('string'))\
.replace(['Y', 'true', 'N', 'false'], ['True', 'True', 'False', 'False'], subset=cols)
Looking at the schema and the transformations applied, there is a type mismatch between String and Boolean returned. E.g. 'N'
is returned as 'False'
(String) and 'false'
is returned as False
(Boolean)
You can cast the transformed columns to String to convert Y
to True
, N
to False
, true
to True
and false
to False
.
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as f
data = [
(1, "sam", None, None, None, True),
(2, "Ram", "Y", 0.05, 10, False),
(3, "Ian", "N", 0.01, 1, False),
(4, "Jim", "N", 1.2, 3, True)
]
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("testing", StringType(), True),
StructField("avg_result", StringType(), True),
StructField("score", StringType(), True),
StructField("active", BooleanType(), True)
])
df = sc.parallelize(data).toDF(schema)
Before applying the transformations
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- testing: string (nullable = true)
|-- avg_result: string (nullable = true)
|-- score: string (nullable = true)
|-- active: boolean (nullable = true)
>>> df.show()
+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
| 1| sam| null| null| null| true|
| 2| Ram| Y| 0.05| 10| false|
| 3| Ian| N| 0.01| 1| false|
| 4| Jim| N| 1.2| 3| true|
+---+----+-------+----------+-----+------+
Applying transformation with cast in the otherwise clause .otherwise(f.col(col).cast("string"))
cols = ["testing", "active"]
for col in cols:
df = df.withColumn(col,
f.when(f.col(col) == 'N', 'False')
.when(f.col(col) == 'Y', 'True')
.when(f.col(col).cast("string") == 'true', 'True')
.when(f.col(col).cast("string") == 'false', 'False'))
Results
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- testing: string (nullable = true)
|-- avg_result: string (nullable = true)
|-- score: string (nullable = true)
|-- active: string (nullable = true)
>>> df.show()
+---+----+-------+----------+-----+------+
| id|name|testing|avg_result|score|active|
+---+----+-------+----------+-----+------+
| 1| sam| null| null| null| True|
| 2| Ram| True| 0.05| 10| False|
| 3| Ian| False| 0.01| 1| False|
| 4| Jim| False| 1.2| 3| True|
+---+----+-------+----------+-----+------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With