Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL nested withColumn

I have a DataFrame that has multiple columns of which some of them are structs. Something like this

root
 |-- foo: struct (nullable = true)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)
 |-- abc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- def: struct (nullable = true)
 |    |    |    |-- a: string (nullable = true)
 |    |    |    |-- b: integer (nullable = true)
 |    |    |    |-- c: string (nullable = true)

I want to apply a UserDefinedFunction on the column baz to replace baz with a function of baz, but I cannot figure out how to do that. Here is an example of the desired output (note that baz is now an int)

root
 |-- foo: struct (nullable = true)
 |    |-- bar: string (nullable = true)
 |    |-- baz: int (nullable = true)
 |-- abc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- def: struct (nullable = true)
 |    |    |    |-- a: string (nullable = true)
 |    |    |    |-- b: integer (nullable = true)
 |    |    |    |-- c: string (nullable = true)

It looks like DataFrame.withColumn only works on top level columns but not on nested columns. I'm using Scala for this problem.

Can someone help me out with this?

Thanks

like image 918
Jon Avatar asked Jun 29 '17 17:06

Jon


2 Answers

that's easy, just use a dot to select nested structures, e.g. $"foo.baz" :

case class Foo(bar:String,baz:String)
case class Record(foo:Foo)

val df = Seq(
   Record(Foo("Hi","There"))
).toDF()


df.printSchema

root
 |-- foo: struct (nullable = true)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)


val myUDF = udf((s:String) => {
 // do something with s 
  s.toUpperCase
})


df
.withColumn("udfResult",myUDF($"foo.baz"))
.show

+----------+---------+
|       foo|udfResult|
+----------+---------+
|[Hi,There]|    THERE|
+----------+---------+

If you want to add the result of your UDF to the existing struct foo, i.e. to get:

root
 |-- foo: struct (nullable = false)
 |    |-- bar: string (nullable = true)
 |    |-- baz: string (nullable = true)
 |    |-- udfResult: string (nullable = true)

there are two options:

with withColumn:

df
.withColumn("udfResult",myUDF($"foo.baz"))
.withColumn("foo",struct($"foo.*",$"udfResult"))
.drop($"udfResult")

with select:

df
.select(struct($"foo.*",myUDF($"foo.baz").as("udfResult")).as("foo"))

EDIT: Replacing the existing attribute in the struct with the result from the UDF: unfortunately, this does not work:

df
.withColumn("foo.baz",myUDF($"foo.baz")) 

but can be done like this:

// get all columns except foo.baz
val structCols = df.select($"foo.*")
    .columns
    .filter(_!="baz")
    .map(name => col("foo."+name))

df.withColumn(
    "foo",
    struct((structCols:+myUDF($"foo.baz").as("baz")):_*)
)
like image 127
Raphael Roth Avatar answered Oct 08 '22 12:10

Raphael Roth


You can do this using the struct function as Raphael Roth has already been demonstrated in their answer above. There is an easier way to do this though using the Make Structs Easy* library. The library adds a withField method to the Column class allowing you to add/replace Columns inside a StructType column, in much the same way as the withColumn method on the DataFrame class allows you to add/replace columns inside a DataFrame. For your specific use-case, you could do something like this:

import org.apache.spark.sql.functions._
import com.github.fqaiser94.mse.methods._

// generate some fake data
case class Foo(bar: String, baz: String)
case class Record(foo: Foo, arrayOfFoo: Seq[Foo])

val df = Seq(
   Record(Foo("Hello", "World"), Seq(Foo("Blue", "Red"), Foo("Green", "Yellow")))
).toDF

df.printSchema

// root
//  |-- foo: struct (nullable = true)
//  |    |-- bar: string (nullable = true)
//  |    |-- baz: string (nullable = true)
//  |-- arrayOfFoo: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- bar: string (nullable = true)
//  |    |    |-- baz: string (nullable = true)

df.show(false)

// +--------------+------------------------------+
// |foo           |arrayOfFoo                    |
// +--------------+------------------------------+
// |[Hello, World]|[[Blue, Red], [Green, Yellow]]|
// +--------------+------------------------------+

// example user defined function that capitalizes a given string
val myUdf = udf((s: String) => s.toUpperCase)

// capitalize value of foo.baz
df.withColumn("foo", $"foo".withField("baz", myUdf($"foo.baz"))).show(false)

// +--------------+------------------------------+
// |foo           |arrayOfFoo                    |
// +--------------+------------------------------+
// |[Hello, WORLD]|[[Blue, Red], [Green, Yellow]]|
// +--------------+------------------------------+

I noticed you had a follow-up question about replacing a Column nested inside a struct nested inside of an array. This can also be done by combining the functions provided by the Make Structs Easy library with the functions provided by spark-hofs library, as follows:

import za.co.absa.spark.hofs._

// capitalize the value of foo.baz in each element of arrayOfFoo
df.withColumn("arrayOfFoo", transform($"arrayOfFoo", foo => foo.withField("baz", myUdf(foo.getField("baz"))))).show(false)

// +--------------+------------------------------+
// |foo           |arrayOfFoo                    |
// +--------------+------------------------------+
// |[Hello, World]|[[Blue, RED], [Green, YELLOW]]|
// +--------------+------------------------------+

*Full disclosure: I am the author of the Make Structs Easy library that is referenced in this answer.

like image 3
Fqp Avatar answered Oct 08 '22 14:10

Fqp