Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark dataframe add a column if it doesn't exist

I have json data in various json files And the keys could be different in lines, for eg

{"a":1 , "b":"abc", "c":"abc2", "d":"abc3"}
{"a":1 , "b":"abc2", "d":"abc"}
{"a":1 ,"b":"abc", "c":"abc2", "d":"abc3"}

I want to aggreagate data on column 'b','c','d' and 'f' which is not present in the given json file but could be present in the other files. SO as column 'f' is not present we can take empty string for that column.

I am reading the input file and aggregating the data like this

import pyspark.sql.functions as f
df =  spark.read.json(inputfile)
df2 =df.groupby("b","c","d","f").agg(f.sum(df["a"]))

This is the final output I want

{"a":2 , "b":"abc", "c":"abc2", "d":"abc3","f":"" }
{"a":1 , "b":"abc2", "c":"" ,"d":"abc","f":""}

Can anyone please Help? Thanks in advance!

like image 591
gashu Avatar asked Mar 01 '17 08:03

gashu


People also ask

How do I add a column to a DataFrame PySpark?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

How do you check if a column exists in a PySpark DataFrame?

Check if a Field Exists in a DataFrame If you want to check if a Column exists with the same Data Type, then use the PySpark schema functions df. schema. fieldNames() or df.


2 Answers

You can check if colum is available in dataframe and modify df only if necessary:

if not 'f' in df.columns:
   df = df.withColumn('f', f.lit(''))

For nested schemas you may need to use df.schema like below:

>>> df.printSchema()
root
 |-- a: struct (nullable = true)
 |    |-- b: long (nullable = true)

>>> 'b' in df.schema['a'].dataType.names
True
>>> 'x' in df.schema['a'].dataType.names
False
like image 98
Mariusz Avatar answered Nov 02 '22 19:11

Mariusz


In case someone needs this in Scala:

if (!df.columns.contains("f")) {
  val newDf = df.withColumn("f", lit(""))
}
like image 29
Javier Montón Avatar answered Nov 02 '22 21:11

Javier Montón