I have a DataFrame that has multiple columns of which some of them are structs. Something like this
root
|-- foo: struct (nullable = true)
| |-- bar: string (nullable = true)
| |-- baz: string (nullable = true)
|-- abc: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- def: struct (nullable = true)
| | | |-- a: string (nullable = true)
| | | |-- b: integer (nullable = true)
| | | |-- c: string (nullable = true)
I want to apply a UserDefinedFunction
on the column baz
to replace baz
with a function of baz
, but I cannot figure out how to do that. Here is an example of the desired output (note that baz
is now an int
)
root
|-- foo: struct (nullable = true)
| |-- bar: string (nullable = true)
| |-- baz: int (nullable = true)
|-- abc: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- def: struct (nullable = true)
| | | |-- a: string (nullable = true)
| | | |-- b: integer (nullable = true)
| | | |-- c: string (nullable = true)
It looks like DataFrame.withColumn
only works on top level columns but not on nested columns. I'm using Scala for this problem.
Can someone help me out with this?
Thanks
that's easy, just use a dot to select nested structures, e.g. $"foo.baz"
:
case class Foo(bar:String,baz:String)
case class Record(foo:Foo)
val df = Seq(
Record(Foo("Hi","There"))
).toDF()
df.printSchema
root
|-- foo: struct (nullable = true)
| |-- bar: string (nullable = true)
| |-- baz: string (nullable = true)
val myUDF = udf((s:String) => {
// do something with s
s.toUpperCase
})
df
.withColumn("udfResult",myUDF($"foo.baz"))
.show
+----------+---------+
| foo|udfResult|
+----------+---------+
|[Hi,There]| THERE|
+----------+---------+
If you want to add the result of your UDF to the existing struct foo
, i.e. to get:
root
|-- foo: struct (nullable = false)
| |-- bar: string (nullable = true)
| |-- baz: string (nullable = true)
| |-- udfResult: string (nullable = true)
there are two options:
with withColumn
:
df
.withColumn("udfResult",myUDF($"foo.baz"))
.withColumn("foo",struct($"foo.*",$"udfResult"))
.drop($"udfResult")
with select
:
df
.select(struct($"foo.*",myUDF($"foo.baz").as("udfResult")).as("foo"))
EDIT: Replacing the existing attribute in the struct with the result from the UDF: unfortunately, this does not work:
df
.withColumn("foo.baz",myUDF($"foo.baz"))
but can be done like this:
// get all columns except foo.baz
val structCols = df.select($"foo.*")
.columns
.filter(_!="baz")
.map(name => col("foo."+name))
df.withColumn(
"foo",
struct((structCols:+myUDF($"foo.baz").as("baz")):_*)
)
You can do this using the struct
function as Raphael Roth has already been demonstrated in their answer above. There is an easier way to do this though using the Make Structs Easy* library. The library adds a withField
method to the Column class allowing you to add/replace Columns inside a StructType column, in much the same way as the withColumn
method on the DataFrame class allows you to add/replace columns inside a DataFrame. For your specific use-case, you could do something like this:
import org.apache.spark.sql.functions._
import com.github.fqaiser94.mse.methods._
// generate some fake data
case class Foo(bar: String, baz: String)
case class Record(foo: Foo, arrayOfFoo: Seq[Foo])
val df = Seq(
Record(Foo("Hello", "World"), Seq(Foo("Blue", "Red"), Foo("Green", "Yellow")))
).toDF
df.printSchema
// root
// |-- foo: struct (nullable = true)
// | |-- bar: string (nullable = true)
// | |-- baz: string (nullable = true)
// |-- arrayOfFoo: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- bar: string (nullable = true)
// | | |-- baz: string (nullable = true)
df.show(false)
// +--------------+------------------------------+
// |foo |arrayOfFoo |
// +--------------+------------------------------+
// |[Hello, World]|[[Blue, Red], [Green, Yellow]]|
// +--------------+------------------------------+
// example user defined function that capitalizes a given string
val myUdf = udf((s: String) => s.toUpperCase)
// capitalize value of foo.baz
df.withColumn("foo", $"foo".withField("baz", myUdf($"foo.baz"))).show(false)
// +--------------+------------------------------+
// |foo |arrayOfFoo |
// +--------------+------------------------------+
// |[Hello, WORLD]|[[Blue, Red], [Green, Yellow]]|
// +--------------+------------------------------+
I noticed you had a follow-up question about replacing a Column nested inside a struct nested inside of an array. This can also be done by combining the functions provided by the Make Structs Easy library with the functions provided by spark-hofs library, as follows:
import za.co.absa.spark.hofs._
// capitalize the value of foo.baz in each element of arrayOfFoo
df.withColumn("arrayOfFoo", transform($"arrayOfFoo", foo => foo.withField("baz", myUdf(foo.getField("baz"))))).show(false)
// +--------------+------------------------------+
// |foo |arrayOfFoo |
// +--------------+------------------------------+
// |[Hello, World]|[[Blue, RED], [Green, YELLOW]]|
// +--------------+------------------------------+
*Full disclosure: I am the author of the Make Structs Easy library that is referenced in this answer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With