All,
Is there an elegant and accepted way to flatten a Spark SQL table (Parquet) with columns that are of nested StructType
For example
If my schema is:
foo |_bar |_baz x y z
How do I select it into a flattened tabular form without resorting to manually running
df.select("foo.bar","foo.baz","x","y","z")
In other words, how do I obtain the result of the above code programmatically given just a StructType
and a DataFrame
There is no accepted way to flatten a Spark SQL table (Parquet) with columns that are of nested StructType but you can do it with a recursive function that generates your select(...) statement by walking through the DataFrame. schema. The recursive function should return an Array[Column].
In the Spark SQL, flatten function is a built-in function that is defined as a function to convert an Array of the Array column (nested array) that is ArrayanyType(ArrayanyType(StringType)) into the single array column on the Spark DataFrame. The Spark SQL is defined as the Spark module for structured data processing.
Problem: How to explode Array of StructType DataFrame columns to rows using Spark. Solution: Spark explode function can be used to explode an Array of Struct ArrayType(StructType) columns to rows on Spark DataFrame using scala example. Before we start, let's create a DataFrame with Struct column in an array.
The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...)
statement by walking through the DataFrame.schema
.
The recursive function should return an Array[Column]
. Every time the function hits a StructType
, it would call itself and append the returned Array[Column]
to its own Array[Column]
.
Something like:
import org.apache.spark.sql.Column import org.apache.spark.sql.types.StructType import org.apache.spark.sql.functions.col def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val colName = if (prefix == null) f.name else (prefix + "." + f.name) f.dataType match { case st: StructType => flattenSchema(st, colName) case _ => Array(col(colName)) } }) }
You would then use it like this:
df.select(flattenSchema(df.schema):_*)
Just wanted to share my solution for Pyspark - it's more or less a translation of @David Griffin's solution, so it supports any level of nested objects.
from pyspark.sql.types import StructType, ArrayType def flatten(schema, prefix=None): fields = [] for field in schema.fields: name = prefix + '.' + field.name if prefix else field.name dtype = field.dataType if isinstance(dtype, ArrayType): dtype = dtype.elementType if isinstance(dtype, StructType): fields += flatten(dtype, prefix=name) else: fields.append(name) return fields df.select(flatten(df.schema)).show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With