I have a nested schema which contains array:
root
|-- alarm_time: string (nullable = true)
|-- alarm_id: string (nullable = true)
|-- user: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- family: string (nullable = true)
| |-- address: struct (nullable = true)
| | |-- postalcode: string (nullable = true)
| | |-- line1: string (nullable = true)
| | |-- city: string (nullable = true)
| | |-- country: string (nullable = true)
|-- device: struct (nullable = true)
| |-- device_usage: string (nullable = true)
| |-- device_id: string (nullable = true)
|-- alarm_info: struct (nullable = true)
| |-- type: string (nullable = true)
| |-- reason: string (nullable = true)
| |-- data: struct (nullable = true)
| | |-- alarm_severity: long (nullable = true)
| | |-- extra_info: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- producer: string (nullable = true)
| | | | |-- comment: string (nullable = true)
I used to ignore array fields and used this code to flatten my schema:
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f.dataType match {
case st: StructType => flattenSchema(st, colName)
case _ => Array(col(colName))
}
})
}
And use it like df.select(flattenSchema(df.schema):_*) but now I have a use-case which needs to keep the array data too, the only thing I can think of is to explode the array and keep multiple rows but I have no luck. Since I am passing the column as args argument I cannot pass another argument.
How can I achieve this (to have flattened schema with an exploded array)?
Am1rr3zA, the solution you have provided would break if we have two arrays at same level. It would not allow two explosions at same time : "Only one generator allowed per select clause but found 2: explode(_1), explode(_2)"
I have updated the solution to keep track of complex types within nesting
def flattenDataFrame(df: DataFrame): DataFrame = {
var flattenedDf: DataFrame = df
if (isNested(df)) {
val flattenedSchema: Array[(Column, Boolean)] = flattenSchema(df.schema)
var simpleColumns: List[Column] = List.empty[Column]
var complexColumns: List[Column] = List.empty[Column]
flattenedSchema.foreach {
case (col, isComplex) => {
if (isComplex) {
complexColumns = complexColumns :+ col
} else {
simpleColumns = simpleColumns :+ col
}
}
}
var crossJoinedDataFrame = df.select(simpleColumns: _*)
complexColumns.foreach(col => {
crossJoinedDataFrame = crossJoinedDataFrame.crossJoin(df.select(col))
crossJoinedDataFrame = flattenDataFrame(crossJoinedDataFrame)
})
crossJoinedDataFrame
} else {
flattenedDf
}
}
private def flattenSchema(schema: StructType, prefix: String = null): Array[(Column, Boolean)] = {
schema.fields.flatMap(field => {
val columnName = if (prefix == null) field.name else prefix + "." + field.name
field.dataType match {
case arrayType: ArrayType => {
val cols: Array[(Column, Boolean)] = Array[(Column, Boolean)](((explode_outer(col(columnName)).as(columnName.replace(".", "_"))), true))
cols
}
case structType: StructType => {
flattenSchema(structType, columnName)
}
case _ => {
val columnNameWithUnderscores = columnName.replace(".", "_")
val metadata = new MetadataBuilder().putString("encoding", "ZSTD").build()
Array(((col(columnName).as(columnNameWithUnderscores, metadata)), false))
}
}
}).filter(field => field != None)
}
def isNested(df: DataFrame): Boolean = {
df.schema.fields.flatMap(field => {
field.dataType match {
case arrayType: ArrayType => {
Array(true)
}
case mapType: MapType => {
Array(true)
}
case structType: StructType => {
Array(true)
}
case _ => {
Array(false)
}
}
}).exists(b => b)
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With