Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flatten nested array in Spark DataFrame

I'm reading in some JSON on the from:

{"a": [{"b": {"c": 1, "d": 2}}]}

That is, the array items are unnecessarily nested. Now, because this happens inside an array, the answers given in How to flatten a struct in a Spark dataframe? don't apply directly.

This is how the dataframe looks when parsed:

root
|-- a: array
|    |-- element: struct
|    |    |-- b: struct
|    |    |    |-- c: integer
|    |    |    |-- d: integer

I'm looking to transform the dataframe into this:

root
|-- a: array
|    |-- element: struct
|    |    |-- b_c: integer
|    |    |-- b_d: integer

How do I go about aliasing the columns inside the array to effectively unnest it?

like image 737
malthe Avatar asked May 05 '26 22:05

malthe


2 Answers

You can use transform:

df2 = df.selectExpr("transform(a, x -> struct(x.b.c as b_c, x.b.d as b_d)) as a")
like image 148
mck Avatar answered May 08 '26 14:05

mck


Using the method presented in the accepted answer I wrote a function to recursively unnest a dataframe (recursing into nested arrays as well):

from pyspark.sql.types import ArrayType, StructType

def flatten(df, sentinel="x"):
    def _gen_flatten_expr(schema, indent, parents, last, transform=False):
        def handle(field, last):
            path = parents + (field.name,)
            alias = (
                " as "
                + "_".join(path[1:] if transform else path)
                + ("," if not last else "")
            )
            if isinstance(field.dataType, StructType):
                yield from _gen_flatten_expr(
                    field.dataType, indent, path, last, transform
                )
            elif (
                isinstance(field.dataType, ArrayType) and
                isinstance(field.dataType.elementType, StructType)
            ):
                yield indent, "transform("
                yield indent + 1, ".".join(path) + ","
                yield indent + 1, sentinel + " -> struct("
                yield from _gen_flatten_expr(
                    field.dataType.elementType, 
                    indent + 2, 
                    (sentinel,), 
                    True, 
                    True
                )
                yield indent + 1, ")"
                yield indent, ")" + alias
            else:
                yield (indent, ".".join(path) + alias)

        try:
            *fields, last_field = schema.fields
        except ValueError:
            pass
        else:
            for field in fields:
                yield from handle(field, False)
            yield from handle(last_field, last)

    lines = []
    for indent, line in _gen_flatten_expr(df.schema, 0, (), True):
        spaces = " " * 4 * indent
        lines.append(spaces + line)

    expr = "struct(" + "\n".join(lines) + ") as " + sentinel
    return df.selectExpr(expr).select(sentinel + ".*")
like image 43
malthe Avatar answered May 08 '26 13:05

malthe



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!