We are building a data ingestion framework in pyspark and wondering what the best way is to handle datatype exceptions. Basically, we want to have a reject table capturing all the data that does not confirm to the schema.
stringDf = sparkSession.createDataFrame(
[
("11/25/1991","1"),
("11/24/1991", None),
("11/30/1991","a")
],
['dateAsString','intAsString']
)
Here is my stringDf with two columns.
+------------+-----------+
|dateAsString|intAsString|
+------------+-----------+
| 11/25/1991| 1|
| 11/24/1991| null|
| 11/30/1991| a|
+------------+-----------+
I would like to create a new column to the data frame called dataTypeValidationErrors to capture all the errors that might be present in this dataset. What is the best way to achieve this using pyspark?
+------------+-----------+------------------------+
|dateAsString|intAsString|dataTypeValidationErrors|
+------------+-----------+------------------------+
| 11/25/1991| 1|None |
| 11/24/1991| null|None |
| 11/30/1991| a|Not a valid Number |
+------------+-----------+------------------------+
You can just try to cast the column to the desired DataType. If there is a mismatch or error, null will be returned. In these cases you need to verify that the original value wasn't null, and if not there was an error.
pyspark.sql.functions.when() to test if the casted column is null and the original value was not null. True, then use the string literal "Not a valid Number" as the column value. Otherwise return the string "None".For example:
import pyspark.sql.functions as f
stringDf.withColumn(
"dataTypeValidationErrors",
f.when(
f.col("intAsString").cast("int").isNull() & f.col("intAsString").isNotNull(),
f.lit("Not a valid Number")
).otherwise(f.lit("None"))
)\
.show()
#+------------+-----------+------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors|
#+------------+-----------+------------------------+
#| 11/25/1991| 1| None|
#| 11/24/1991| null| None|
#| 11/30/1991| a| Not a valid Number|
#+------------+-----------+------------------------+
You could also expand this to multiple columns:
Suppose you had one more row with an invalid dateAsString value:
stringDf = spark.createDataFrame(
[
("11/25/1991","1"),
("11/24/1991", None),
("11/30/1991","a"),
("13.14.15", "b")
],
['dateAsString','intAsString']
)
Use a dictionary to define the conversion for each column:
conversions = {
'dateAsString':lambda c: f.from_unixtime(f.unix_timestamp(c,"MM/dd/yyyy")).cast("date"),
'intAsString':lambda c: f.col(c).cast('int')
}
stringDf.withColumn(
"dataTypeValidationErrors",
f.concat_ws(", ",
*[
f.when(
v(k).isNull() & f.col(k).isNotNull(),
f.lit(k + " not valid")
).otherwise(f.lit(None))
for k, v in conversions.items()
]
)
)\
.show(truncate=False)
#+------------+-----------+---------------------------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors |
#+------------+-----------+---------------------------------------------+
#|11/25/1991 |1 | |
#|11/24/1991 |null | |
#|11/30/1991 |a |intAsString not valid |
#|13.14.15 |b |dateAsString not valid, intAsString not valid|
#+------------+-----------+---------------------------------------------+
Or if you just want to know if there was an error on a row, without needing to know specifics:
stringDf.withColumn(
"dataTypeValidationErrors",
f.when(
reduce(
lambda a, b: a|b,
(v(k).isNull() & f.col(k).isNotNull() for k, v in conversions.items())
),
f.lit("Validation Error")
).otherwise(f.lit("None"))
)\
.show(truncate=False)
#+------------+-----------+------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors|
#+------------+-----------+------------------------+
#|11/25/1991 |1 |None |
#|11/24/1991 |null |None |
#|11/30/1991 |a |Validation Error |
#|13.14.15 |b |Validation Error |
#+------------+-----------+------------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With