I have a dataframe with a schema like
root
|-- state: struct (nullable = true)
| |-- fld: integer (nullable = true)
I'd like to add columns within the state
struct, that is, create a dataframe with a schema like
root
|-- state: struct (nullable = true)
| |-- fld: integer (nullable = true)
| |-- a: integer (nullable = true)
I tried
df.withColumn('state.a', val).printSchema()
# root
# |-- state: struct (nullable = true)
# | |-- fld: integer (nullable = true)
# |-- state.a: integer (nullable = true)
Here is a way to do it without using a udf
:
# create example dataframe
import pyspark.sql.functions as f
data = [
({'fld': 0},)
]
schema = StructType(
[
StructField('state',
StructType(
[StructField('fld', IntegerType())]
)
)
]
)
df = sqlCtx.createDataFrame(data, schema)
df.printSchema()
#root
# |-- state: struct (nullable = true)
# | |-- fld: integer (nullable = true)
Now use withColumn()
and add the new field using lit()
and alias()
.
val = 1
df_new = df.withColumn(
'state',
f.struct(*[f.col('state')['fld'].alias('fld'), f.lit(val).alias('a')])
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# | |-- fld: integer (nullable = true)
# | |-- a: integer (nullable = false)
If you have a lot of fields in the nested struct you can use a list comprehension, using df.schema["state"].dataType.names
to get the field names. For example:
val = 1
s_fields = df.schema["state"].dataType.names # ['fld']
df_new = df.withColumn(
'state',
f.struct(*([f.col('state')[c].alias(c) for c in s_fields] + [f.lit(val).alias('a')]))
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# | |-- fld: integer (nullable = true)
# | |-- a: integer (nullable = false)
References
Although this is a too late answer, for pyspark version 2.x.x following is supported.
Assuming dfOld
already contains state
and fld
as asked in question.
dfOld.withColumn("a","value")
dfNew = dfOld.select("level1Field1", "level1Field2", struct(col("state.fld").alias("fld"), col("a")).alias("state"))
Reference: https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803
Use a transformation such as the following:
import pyspark.sql.functions as f
df = df.withColumn(
"state",
f.struct(
f.col("state.*"),
f.lit(123).alias("a")
)
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With