I have json data in various json files And the keys could be different in lines, for eg
{"a":1 , "b":"abc", "c":"abc2", "d":"abc3"}
{"a":1 , "b":"abc2", "d":"abc"}
{"a":1 ,"b":"abc", "c":"abc2", "d":"abc3"}
I want to aggreagate data on column 'b','c','d' and 'f' which is not present in the given json file but could be present in the other files. SO as column 'f' is not present we can take empty string for that column.
I am reading the input file and aggregating the data like this
import pyspark.sql.functions as f
df = spark.read.json(inputfile)
df2 =df.groupby("b","c","d","f").agg(f.sum(df["a"]))
This is the final output I want
{"a":2 , "b":"abc", "c":"abc2", "d":"abc3","f":"" }
{"a":1 , "b":"abc2", "c":"" ,"d":"abc","f":""}
Can anyone please Help? Thanks in advance!
In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .
Check if a Field Exists in a DataFrame If you want to check if a Column exists with the same Data Type, then use the PySpark schema functions df. schema. fieldNames() or df.
You can check if colum is available in dataframe and modify df
only if necessary:
if not 'f' in df.columns:
df = df.withColumn('f', f.lit(''))
For nested schemas you may need to use df.schema
like below:
>>> df.printSchema()
root
|-- a: struct (nullable = true)
| |-- b: long (nullable = true)
>>> 'b' in df.schema['a'].dataType.names
True
>>> 'x' in df.schema['a'].dataType.names
False
In case someone needs this in Scala:
if (!df.columns.contains("f")) {
val newDf = df.withColumn("f", lit(""))
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With