Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add new field to struct column?

I have a dataframe with something like this df.printSchema:

root
|-- ts: timestamp (nullable = true)
|-- geoip: struct (nullable = true)
|    |-- city: string (nullable = true)
|    |-- continent: string (nullable = true)
|    |-- location: struct (nullable = true)
|    |    |-- lat: float (nullable = true)
|    |    |-- lon: float (nullable = true)

I know that for example with df = df.withColumn("error", lit(null).cast(StringType)) I can add a null field called error of type String right under the root. How could I add the same field under the geoip Struct, or under the location Struct?

I have also tried df = df.withColumn("geoip.error", lit(null).cast(StringType)) with no luck.

like image 663
Andrea Avatar asked May 25 '17 10:05

Andrea


2 Answers

TL;DR You have to map the rows in a Dataset somehow.

map Operator (the most flexible)

Use map operation which gives you the most flexibility since you're in the total control of the final structure of the rows.

map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] (Scala-specific) Returns a new Dataset that contains the result of applying func to each element.

Your case would then look as follows:

// Create a sample dataset to work with
scala> val df = Seq("timestamp").
  toDF("ts").
  withColumn("geoip", struct(lit("Warsaw") as "city", lit("Europe") as "continent"))
df: org.apache.spark.sql.DataFrame = [ts: string, geoip: struct<city: string, continent: string>]

scala> df.show
+---------+---------------+
|       ts|          geoip|
+---------+---------------+
|timestamp|[Warsaw,Europe]|
+---------+---------------+

scala> df.printSchema
root
 |-- ts: string (nullable = true)
 |-- geoip: struct (nullable = false)
 |    |-- city: string (nullable = false)
 |    |-- continent: string (nullable = false)

val newDF = df.
  as[(String, (String, String))].  // <-- convert to typed Dataset as it makes map easier
  map { case (ts, (city, continent)) =>
    (ts, (city, continent, "New field with some value")) }. // <-- add new column
  toDF("timestamp", "geoip") // <-- name the top-level fields

scala> newDF.printSchema
root
 |-- timestamp: string (nullable = true)
 |-- geoip: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: string (nullable = true)
 |    |-- _3: string (nullable = true)

That's not pretty as you lost the names of the columns.

Let's define the schema with the proper names. That's where you can use StructType with StructFields (you could also use a set of case classes, but I leave it to you as a home exercise).

import org.apache.spark.sql.types._
val geoIP = StructType(
  $"city".string ::
  $"continent".string ::
  $"new_field".string ::
  Nil
)
val mySchema = StructType(
  $"timestamp".string ::
  $"geoip".struct(geoIP) ::
  Nil
)

scala> mySchema.printTreeString
root
 |-- timestamp: string (nullable = true)
 |-- geoip: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- continent: string (nullable = true)
 |    |-- new_field: string (nullable = true)

Apply the new schema for proper names.

val properNamesDF = spark.createDataFrame(newDF.rdd, mySchema)
scala> properNamesDF.show(truncate = false)
+---------+-----------------------------------------+
|timestamp|geoip                                    |
+---------+-----------------------------------------+
|timestamp|[Warsaw,Europe,New field with some value]|
+---------+-----------------------------------------+

How to add field to "struct of a struct"

If you feel fairly adventurous, you may want to play with StructType as a collection type and re-shape it using Scala's Collection API and copy constructor.

It does not really matter how deep you want to go and what level of "struct of a struct" you want to modify. Just consider a StructType as a collection of StructFields that may in turn be StructTypes.

val oldSchema = newDF.schema
val names = Seq("city", "continent", "new_field")
val geoipFields = oldSchema("geoip").
  dataType.
  asInstanceOf[StructType].
  zip(names).
  map { case (field, name) => field.copy(name = name) }
val myNewSchema = StructType(
  $"timestamp".string :: 
  $"geoip".struct(StructType(geoipFields)) :: Nil)
val properNamesDF = spark.createDataFrame(newDF.rdd, myNewSchema)
scala> properNamesDF.printSchema
root
 |-- timestamp: string (nullable = true)
 |-- geoip: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- continent: string (nullable = true)
 |    |-- new_field: string (nullable = true)

withColumn Operator with struct Function

You could use withColumn operator with struct function.

withColumn(colName: String, col: Column): DataFrame Returns a new Dataset by adding a column or replacing the existing column that has the same name.

struct(cols: Column*): Column Creates a new struct column.

The code could look as follows:

val anotherNewDF = df.
  withColumn("geoip", // <-- use the same column name so you hide the existing one
    struct(
      $"geoip.city", // <-- reference existing column to copy the values
      $"geoip.continent",
      lit("new value") as "new_field")) // <-- new field with fixed value

scala> anotherNewDF.printSchema
root
 |-- ts: string (nullable = true)
 |-- geoip: struct (nullable = false)
 |    |-- city: string (nullable = false)
 |    |-- continent: string (nullable = false)
 |    |-- new_field: string (nullable = false)

As per a comment from @shj, you can use a wildcard to avoid re-listing the columns, which makes it pretty flexible, e.g.

val anotherNewDF = df
  .withColumn("geoip",
    struct(
      $"geoip.*", // <-- the wildcard here
      lit("new value") as "new_field"))
like image 152
Jacek Laskowski Avatar answered Sep 18 '22 01:09

Jacek Laskowski


You could also simply do:

df = df.withColumn("goip", struct($"geoip.*", lit("This is fine.").alias("error")))

That adds an "error" field to the "geoip" struct.

like image 38
Adrien Brunelat Avatar answered Sep 18 '22 01:09

Adrien Brunelat