Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Automatically and Elegantly flatten DataFrame in Spark SQL

All,

Is there an elegant and accepted way to flatten a Spark SQL table (Parquet) with columns that are of nested StructType

For example

If my schema is:

foo  |_bar  |_baz x y z 

How do I select it into a flattened tabular form without resorting to manually running

df.select("foo.bar","foo.baz","x","y","z") 

In other words, how do I obtain the result of the above code programmatically given just a StructType and a DataFrame

like image 274
echen Avatar asked May 26 '16 21:05

echen


People also ask

How do I flatten a DataFrame spark?

There is no accepted way to flatten a Spark SQL table (Parquet) with columns that are of nested StructType but you can do it with a recursive function that generates your select(...) statement by walking through the DataFrame. schema. The recursive function should return an Array[Column].

What is flatten in spark?

In the Spark SQL, flatten function is a built-in function that is defined as a function to convert an Array of the Array column (nested array) that is ArrayanyType(ArrayanyType(StringType)) into the single array column on the Spark DataFrame. The Spark SQL is defined as the Spark module for structured data processing.

How do you explode a struct in spark?

Problem: How to explode Array of StructType DataFrame columns to rows using Spark. Solution: Spark explode function can be used to explode an Array of Struct ArrayType(StructType) columns to rows on Spark DataFrame using scala example. Before we start, let's create a DataFrame with Struct column in an array.


2 Answers

The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...) statement by walking through the DataFrame.schema.

The recursive function should return an Array[Column]. Every time the function hits a StructType, it would call itself and append the returned Array[Column] to its own Array[Column].

Something like:

import org.apache.spark.sql.Column import org.apache.spark.sql.types.StructType import org.apache.spark.sql.functions.col  def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {   schema.fields.flatMap(f => {     val colName = if (prefix == null) f.name else (prefix + "." + f.name)      f.dataType match {       case st: StructType => flattenSchema(st, colName)       case _ => Array(col(colName))     }   }) } 

You would then use it like this:

df.select(flattenSchema(df.schema):_*) 
like image 193
David Griffin Avatar answered Oct 21 '22 21:10

David Griffin


Just wanted to share my solution for Pyspark - it's more or less a translation of @David Griffin's solution, so it supports any level of nested objects.

from pyspark.sql.types import StructType, ArrayType    def flatten(schema, prefix=None):     fields = []     for field in schema.fields:         name = prefix + '.' + field.name if prefix else field.name         dtype = field.dataType         if isinstance(dtype, ArrayType):             dtype = dtype.elementType          if isinstance(dtype, StructType):             fields += flatten(dtype, prefix=name)         else:             fields.append(name)      return fields   df.select(flatten(df.schema)).show() 
like image 20
Evan V Avatar answered Oct 21 '22 20:10

Evan V