Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rename nested struct columns in a Spark DataFrame [duplicate]

I am trying to change the names of a DataFrame columns in scala. I am easily able to change the column names for direct fields but I'm facing difficulty while converting array struct columns.

Below is my DataFrame schema.

|-- _VkjLmnVop: string (nullable = true)
|-- _KaTasLop: string (nullable = true)
|-- AbcDef: struct (nullable = true)
 |    |-- UvwXyz: struct (nullable = true)
 |    |    |-- _MnoPqrstUv: string (nullable = true)
 |    |    |-- _ManDevyIxyz: string (nullable = true)

But I need the schema like below

|-- vkj_lmn_vop: string (nullable = true)
|-- ka_tas_lop: string (nullable = true)
|-- abc_def: struct (nullable = true)
 |    |-- uvw_xyz: struct (nullable = true)
 |    |    |-- mno_pqrst_uv: string (nullable = true)
 |    |    |-- man_devy_ixyz: string (nullable = true)

For Non Struct columns I'm changing column names by below

def aliasAllColumns(df: DataFrame): DataFrame = {
  df.select(df.columns.map { c =>
    df.col(c)
      .as(
        c.replaceAll("_", "")
          .replaceAll("([A-Z])", "_$1")
          .toLowerCase
          .replaceFirst("_", ""))
  }: _*)
}
aliasAllColumns(file_data_df).show(1)

How I can change Struct column names dynamically?

like image 822
Vijay Avatar asked Feb 04 '23 19:02

Vijay


1 Answers

You can create a recursive method to traverse the DataFrame schema for renaming the columns:

import org.apache.spark.sql.types._

def renameAllCols(schema: StructType, rename: String => String): StructType = {
  def recurRename(schema: StructType): Seq[StructField] = schema.fields.map{
      case StructField(name, dtype: StructType, nullable, meta) =>
        StructField(rename(name), StructType(recurRename(dtype)), nullable, meta)
      case StructField(name, dtype: ArrayType, nullable, meta) if dtype.elementType.isInstanceOf[StructType] =>
        StructField(rename(name), ArrayType(StructType(recurRename(dtype.elementType.asInstanceOf[StructType])), true), nullable, meta)
      case StructField(name, dtype, nullable, meta) =>
        StructField(rename(name), dtype, nullable, meta)
    }
  StructType(recurRename(schema))
}

Testing it with the following example:

import org.apache.spark.sql.functions._
import spark.implicits._

val renameFcn = (s: String) =>
  s.replace("_", "").replaceAll("([A-Z])", "_$1").toLowerCase.dropWhile(_ == '_')

case class C(A_Bc: Int, D_Ef: Int)

val df = Seq(
  (10, "a", C(1, 2), Seq(C(11, 12), C(13, 14)), Seq(101, 102)),
  (20, "b", C(3, 4), Seq(C(15, 16)), Seq(103))
).toDF("_VkjLmnVop", "_KaTasLop", "AbcDef", "ArrStruct", "ArrInt")

val newDF = spark.createDataFrame(df.rdd, renameAllCols(df.schema, renameFcn))

newDF.printSchema
// root
//  |-- vkj_lmn_vop: integer (nullable = false)
//  |-- ka_tas_lop: string (nullable = true)
//  |-- abc_def: struct (nullable = true)
//  |    |-- a_bc: integer (nullable = false)
//  |    |-- d_ef: integer (nullable = false)
//  |-- arr_struct: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- a_bc: integer (nullable = false)
//  |    |    |-- d_ef: integer (nullable = false)
//  |-- arr_int: array (nullable = true)
//  |    |-- element: integer (containsNull = false)
like image 169
Leo C Avatar answered Feb 06 '23 16:02

Leo C