I have a dataframe with something like this df.printSchema
:
root
|-- ts: timestamp (nullable = true)
|-- geoip: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- continent: string (nullable = true)
| |-- location: struct (nullable = true)
| | |-- lat: float (nullable = true)
| | |-- lon: float (nullable = true)
I know that for example with df = df.withColumn("error", lit(null).cast(StringType))
I can add a null
field called error
of type String
right under the root
. How could I add the same field under the geoip
Struct, or under the location
Struct?
I have also tried df = df.withColumn("geoip.error", lit(null).cast(StringType))
with no luck.
TL;DR You have to map the rows in a Dataset somehow.
Use map operation which gives you the most flexibility since you're in the total control of the final structure of the rows.
map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] (Scala-specific) Returns a new Dataset that contains the result of applying
func
to each element.
Your case would then look as follows:
// Create a sample dataset to work with
scala> val df = Seq("timestamp").
toDF("ts").
withColumn("geoip", struct(lit("Warsaw") as "city", lit("Europe") as "continent"))
df: org.apache.spark.sql.DataFrame = [ts: string, geoip: struct<city: string, continent: string>]
scala> df.show
+---------+---------------+
| ts| geoip|
+---------+---------------+
|timestamp|[Warsaw,Europe]|
+---------+---------------+
scala> df.printSchema
root
|-- ts: string (nullable = true)
|-- geoip: struct (nullable = false)
| |-- city: string (nullable = false)
| |-- continent: string (nullable = false)
val newDF = df.
as[(String, (String, String))]. // <-- convert to typed Dataset as it makes map easier
map { case (ts, (city, continent)) =>
(ts, (city, continent, "New field with some value")) }. // <-- add new column
toDF("timestamp", "geoip") // <-- name the top-level fields
scala> newDF.printSchema
root
|-- timestamp: string (nullable = true)
|-- geoip: struct (nullable = true)
| |-- _1: string (nullable = true)
| |-- _2: string (nullable = true)
| |-- _3: string (nullable = true)
That's not pretty as you lost the names of the columns.
Let's define the schema with the proper names. That's where you can use StructType with StructFields (you could also use a set of case classes, but I leave it to you as a home exercise).
import org.apache.spark.sql.types._
val geoIP = StructType(
$"city".string ::
$"continent".string ::
$"new_field".string ::
Nil
)
val mySchema = StructType(
$"timestamp".string ::
$"geoip".struct(geoIP) ::
Nil
)
scala> mySchema.printTreeString
root
|-- timestamp: string (nullable = true)
|-- geoip: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- continent: string (nullable = true)
| |-- new_field: string (nullable = true)
Apply the new schema for proper names.
val properNamesDF = spark.createDataFrame(newDF.rdd, mySchema)
scala> properNamesDF.show(truncate = false)
+---------+-----------------------------------------+
|timestamp|geoip |
+---------+-----------------------------------------+
|timestamp|[Warsaw,Europe,New field with some value]|
+---------+-----------------------------------------+
If you feel fairly adventurous, you may want to play with StructType
as a collection type and re-shape it using Scala's Collection API and copy constructor.
It does not really matter how deep you want to go and what level of "struct of a struct" you want to modify. Just consider a StructType as a collection of StructFields that may in turn be StructTypes.
val oldSchema = newDF.schema
val names = Seq("city", "continent", "new_field")
val geoipFields = oldSchema("geoip").
dataType.
asInstanceOf[StructType].
zip(names).
map { case (field, name) => field.copy(name = name) }
val myNewSchema = StructType(
$"timestamp".string ::
$"geoip".struct(StructType(geoipFields)) :: Nil)
val properNamesDF = spark.createDataFrame(newDF.rdd, myNewSchema)
scala> properNamesDF.printSchema
root
|-- timestamp: string (nullable = true)
|-- geoip: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- continent: string (nullable = true)
| |-- new_field: string (nullable = true)
You could use withColumn operator with struct function.
withColumn(colName: String, col: Column): DataFrame Returns a new Dataset by adding a column or replacing the existing column that has the same name.
struct(cols: Column*): Column Creates a new struct column.
The code could look as follows:
val anotherNewDF = df.
withColumn("geoip", // <-- use the same column name so you hide the existing one
struct(
$"geoip.city", // <-- reference existing column to copy the values
$"geoip.continent",
lit("new value") as "new_field")) // <-- new field with fixed value
scala> anotherNewDF.printSchema
root
|-- ts: string (nullable = true)
|-- geoip: struct (nullable = false)
| |-- city: string (nullable = false)
| |-- continent: string (nullable = false)
| |-- new_field: string (nullable = false)
As per a comment from @shj, you can use a wildcard to avoid re-listing the columns, which makes it pretty flexible, e.g.
val anotherNewDF = df
.withColumn("geoip",
struct(
$"geoip.*", // <-- the wildcard here
lit("new value") as "new_field"))
You could also simply do:
df = df.withColumn("goip", struct($"geoip.*", lit("This is fine.").alias("error")))
That adds an "error" field to the "geoip" struct.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With