Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I add a column to a nested struct in a pyspark dataframe?

I have a dataframe with a schema like

root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)

I'd like to add columns within the state struct, that is, create a dataframe with a schema like

root
 |-- state: struct (nullable = true)
 |    |-- fld: integer (nullable = true)
 |    |-- a: integer (nullable = true)

I tried

df.withColumn('state.a', val).printSchema()
# root
#  |-- state: struct (nullable = true)
#  |    |-- fld: integer (nullable = true)
#  |-- state.a: integer (nullable = true)
like image 964
MrCartoonology Avatar asked Feb 14 '18 00:02

MrCartoonology


3 Answers

Here is a way to do it without using a udf:

# create example dataframe
import pyspark.sql.functions as f
data = [
    ({'fld': 0},)
]

schema = StructType(
    [
        StructField('state',
            StructType(
                [StructField('fld', IntegerType())]
            )
        )
    ]
)

df = sqlCtx.createDataFrame(data, schema)
df.printSchema()
#root
# |-- state: struct (nullable = true)
# |    |-- fld: integer (nullable = true)

Now use withColumn() and add the new field using lit() and alias().

val = 1
df_new = df.withColumn(
    'state', 
    f.struct(*[f.col('state')['fld'].alias('fld'), f.lit(val).alias('a')])
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# |    |-- fld: integer (nullable = true)
# |    |-- a: integer (nullable = false)

If you have a lot of fields in the nested struct you can use a list comprehension, using df.schema["state"].dataType.names to get the field names. For example:

val = 1
s_fields = df.schema["state"].dataType.names # ['fld']
df_new = df.withColumn(
    'state', 
    f.struct(*([f.col('state')[c].alias(c) for c in s_fields] + [f.lit(val).alias('a')]))
)
df_new.printSchema()
#root
# |-- state: struct (nullable = false)
# |    |-- fld: integer (nullable = true)
# |    |-- a: integer (nullable = false)

References

  • I found a way to get the field names from the Struct without naming them manually from this answer.
like image 63
pault Avatar answered Oct 23 '22 00:10

pault


Although this is a too late answer, for pyspark version 2.x.x following is supported.

Assuming dfOld already contains state and fld as asked in question.

dfOld.withColumn("a","value") dfNew = dfOld.select("level1Field1", "level1Field2", struct(col("state.fld").alias("fld"), col("a")).alias("state"))

Reference: https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803

like image 31
desaiankitb Avatar answered Oct 23 '22 02:10

desaiankitb


Use a transformation such as the following:

import pyspark.sql.functions as f

df = df.withColumn(
    "state",
    f.struct(
        f.col("state.*"),
        f.lit(123).alias("a")
    )
)
like image 10
malthe Avatar answered Oct 23 '22 00:10

malthe