Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Data Type validation in pyspark

We are building a data ingestion framework in pyspark and wondering what the best way is to handle datatype exceptions. Basically, we want to have a reject table capturing all the data that does not confirm to the schema.

stringDf = sparkSession.createDataFrame(
    [
        ("11/25/1991","1"),
        ("11/24/1991", None),
        ("11/30/1991","a")
    ], 
    ['dateAsString','intAsString']
)

Here is my stringDf with two columns.

+------------+-----------+
|dateAsString|intAsString|
+------------+-----------+
|  11/25/1991|          1|
|  11/24/1991|       null|
|  11/30/1991|          a|
+------------+-----------+

I would like to create a new column to the data frame called dataTypeValidationErrors to capture all the errors that might be present in this dataset. What is the best way to achieve this using pyspark?

+------------+-----------+------------------------+
|dateAsString|intAsString|dataTypeValidationErrors|
+------------+-----------+------------------------+
|  11/25/1991|          1|None                    |
|  11/24/1991|       null|None                    |
|  11/30/1991|          a|Not a valid Number      |
+------------+-----------+------------------------+
like image 524
Anand Kannan Avatar asked Feb 06 '26 19:02

Anand Kannan


1 Answers

You can just try to cast the column to the desired DataType. If there is a mismatch or error, null will be returned. In these cases you need to verify that the original value wasn't null, and if not there was an error.

  • Use pyspark.sql.functions.when() to test if the casted column is null and the original value was not null.
  • If this is True, then use the string literal "Not a valid Number" as the column value. Otherwise return the string "None".

For example:

import pyspark.sql.functions as f

stringDf.withColumn(
        "dataTypeValidationErrors",
        f.when(
            f.col("intAsString").cast("int").isNull() & f.col("intAsString").isNotNull(),
            f.lit("Not a valid Number")
        ).otherwise(f.lit("None"))
    )\
    .show()
#+------------+-----------+------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors|
#+------------+-----------+------------------------+
#|  11/25/1991|          1|                    None|
#|  11/24/1991|       null|                    None|
#|  11/30/1991|          a|      Not a valid Number|
#+------------+-----------+------------------------+

You could also expand this to multiple columns:

Suppose you had one more row with an invalid dateAsString value:

stringDf = spark.createDataFrame(
    [
        ("11/25/1991","1"),
        ("11/24/1991", None),
        ("11/30/1991","a"),
        ("13.14.15", "b")
    ], 
    ['dateAsString','intAsString']
)

Use a dictionary to define the conversion for each column:

conversions = {
    'dateAsString':lambda c: f.from_unixtime(f.unix_timestamp(c,"MM/dd/yyyy")).cast("date"),
    'intAsString':lambda c: f.col(c).cast('int')
}

stringDf.withColumn(
        "dataTypeValidationErrors",
        f.concat_ws(", ",
            *[
                f.when(
                    v(k).isNull() & f.col(k).isNotNull(),
                    f.lit(k + " not valid")
                ).otherwise(f.lit(None))
                for k, v in conversions.items()
            ]
        )
    )\
    .show(truncate=False)
#+------------+-----------+---------------------------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors                     |
#+------------+-----------+---------------------------------------------+
#|11/25/1991  |1          |                                             |
#|11/24/1991  |null       |                                             |
#|11/30/1991  |a          |intAsString not valid                        |
#|13.14.15    |b          |dateAsString not valid, intAsString not valid|
#+------------+-----------+---------------------------------------------+

Or if you just want to know if there was an error on a row, without needing to know specifics:

stringDf.withColumn(
        "dataTypeValidationErrors",
        f.when(
            reduce(
                lambda a, b: a|b,
                (v(k).isNull() & f.col(k).isNotNull() for k, v in conversions.items())
            ),
            f.lit("Validation Error")
        ).otherwise(f.lit("None"))     
    )\
    .show(truncate=False)
#+------------+-----------+------------------------+
#|dateAsString|intAsString|dataTypeValidationErrors|
#+------------+-----------+------------------------+
#|11/25/1991  |1          |None                    |
#|11/24/1991  |null       |None                    |
#|11/30/1991  |a          |Validation Error        |
#|13.14.15    |b          |Validation Error        |
#+------------+-----------+------------------------+
like image 95
pault Avatar answered Feb 12 '26 15:02

pault