Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Return empty column if column does not exist in dataframe

As shown in the below code, I am reading a JSON file into a dataframe and then selecting some fields from that dataframe into another one.

df_record = spark.read.json("path/to/file.JSON",multiLine=True)

df_basicInfo = df_record.select(col("key1").alias("ID"), \
                                col("key2").alias("Status"), \
                                col("key3.ResponseType").alias("ResponseType"), \
                                col("key3.someIndicator").alias("SomeIndicator") \
                                )

Issue is that some times, the JSON file does not have some of the keys that I try to fetch - like ResponseType. So it ends up throwing errors like:

org.apache.spark.sql.AnalysisException: No such struct field ResponseType

How can I get around this issue without forcing a schema at the time of read? is it possible to make it return a NULL under that column when it is not available?

how do I detect if a spark dataframe has a column Does mention how to detect if a column is available in a dataframe. This question, however, is about how to use that function.

like image 742
pallupz Avatar asked Oct 04 '18 10:10

pallupz


People also ask

How do I check if a column is present in a data frame Spark?

Spark Check if Column Exists in DataFrame Spark DataFrame has an attribute columns that returns all column names as an Array[String] , once you have the columns, you can use the array function contains() to check if the column present. Note that df. columns returns only top level columns but not nested struct columns.

How do you check if a column exists in a DataFrame PySpark?

Check if a Field Exists in a DataFrame If you want to check if a Column exists with the same Data Type, then use the PySpark schema functions df. schema. fieldNames() or df. schema .

How do I exclude a column from a DataFrame in Spark?

The Spark DataFrame provides the drop() method to drop the column or the field from the DataFrame or the Dataset. The drop() method is also used to remove the multiple columns from the Spark DataFrame or the Database.

How do you write if condition in PySpark?

PySpark when() is SQL function, in order to use this first you should import and this returns a Column type, otherwise() is a function of Column , when otherwise() not used and none of the conditions met it assigns None (Null) value. Usage would be like when(condition). otherwise(default) .


1 Answers

Using has_column function define here by zero323 and general guidelines about adding empty columns either

from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *

if has_column(df_record, "key3.ResponseType"):
    df_basicInfo = df_record.withColumn("ResponseType", col("key3.ResponseType"))
else:
    # Adjust types according to your needs
    df_basicInfo = df_record.withColumn("ResponseType", lit(None).cast("string")) 

and repeat for each column you need, or

df_record.withColumn(
   "ResponseType", 
   when(
       lit(has_column(df_record, "key3.ResponseType")),
       col("key3.ResponseType")
   ).otherwise(lit(None).cast("string"))

Adjust types according to your requirements, and repeat process for the remaining columns.

Alternatively define a schema that covers all desired types:

schema = StructType([
    StructField("key1", StringType()),
    StructField("key2", StringType()),
    StructField("key2", StructType([
        StructField("ResponseType", StringType()),
        StructField("someIndicator", StringType()),
    ]))
])

df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)

(once again adjust the types), and use your current code.

like image 112
2 revs, 2 users 93%user10456460 Avatar answered Sep 28 '22 00:09

2 revs, 2 users 93%user10456460