As shown in the below code, I am reading a JSON file into a dataframe and then selecting some fields from that dataframe into another one.
df_record = spark.read.json("path/to/file.JSON",multiLine=True)
df_basicInfo = df_record.select(col("key1").alias("ID"), \
col("key2").alias("Status"), \
col("key3.ResponseType").alias("ResponseType"), \
col("key3.someIndicator").alias("SomeIndicator") \
)
Issue is that some times, the JSON file does not have some of the keys that I try to fetch - like ResponseType
. So it ends up throwing errors like:
org.apache.spark.sql.AnalysisException: No such struct field ResponseType
How can I get around this issue without forcing a schema at the time of read? is it possible to make it return a NULL under that column when it is not available?
how do I detect if a spark dataframe has a column Does mention how to detect if a column is available in a dataframe. This question, however, is about how to use that function.
Spark Check if Column Exists in DataFrame Spark DataFrame has an attribute columns that returns all column names as an Array[String] , once you have the columns, you can use the array function contains() to check if the column present. Note that df. columns returns only top level columns but not nested struct columns.
Check if a Field Exists in a DataFrame If you want to check if a Column exists with the same Data Type, then use the PySpark schema functions df. schema. fieldNames() or df. schema .
The Spark DataFrame provides the drop() method to drop the column or the field from the DataFrame or the Dataset. The drop() method is also used to remove the multiple columns from the Spark DataFrame or the Database.
PySpark when() is SQL function, in order to use this first you should import and this returns a Column type, otherwise() is a function of Column , when otherwise() not used and none of the conditions met it assigns None (Null) value. Usage would be like when(condition). otherwise(default) .
Using has_column
function define here by zero323 and general guidelines about adding empty columns either
from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *
if has_column(df_record, "key3.ResponseType"):
df_basicInfo = df_record.withColumn("ResponseType", col("key3.ResponseType"))
else:
# Adjust types according to your needs
df_basicInfo = df_record.withColumn("ResponseType", lit(None).cast("string"))
and repeat for each column you need, or
df_record.withColumn(
"ResponseType",
when(
lit(has_column(df_record, "key3.ResponseType")),
col("key3.ResponseType")
).otherwise(lit(None).cast("string"))
Adjust types according to your requirements, and repeat process for the remaining columns.
Alternatively define a schema that covers all desired types:
schema = StructType([
StructField("key1", StringType()),
StructField("key2", StringType()),
StructField("key2", StructType([
StructField("ResponseType", StringType()),
StructField("someIndicator", StringType()),
]))
])
df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)
(once again adjust the types), and use your current code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With