Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to match Dataframe column names to Scala case class attributes?

The column names in this example from spark-sql come from the case class Person.

case class Person(name: String, age: Int)

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

https://spark.apache.org/docs/1.1.0/sql-programming-guide.html

However in many cases the parameter names may be changed. This would cause columns to not be found if the file has not been updated to reflect the change.

How can I specify an appropriate mapping?

I am thinking something like:

  val schema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("age", IntegerType, nullable = false)
  ))


  val ps: Seq[Person] = ???

  val personRDD = sc.parallelize(ps)

  // Apply the schema to the RDD.
  val personDF: DataFrame = sqlContext.createDataFrame(personRDD, schema)
like image 708
BAR Avatar asked Sep 12 '15 04:09

BAR


1 Answers

Basically, all the mapping you need to do can be achieved with DataFrame.select(...). (Here, I assume, that no type conversions need to be done.) Given the forward- and backward-mapping as maps, the essential part is

val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
// personsDF your original dataframe  
val mappedDF = personsDF.select( mapping: _* )

where mapping is an array of Columns with alias.

Example code

object Example {   

  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkContext, SparkConf}

  case class Person(name: String, age: Int)

  object Mapping {
    val from = Map("name" -> "a", "age" -> "b")
    val to = Map("a" -> "name", "b" -> "age")
  }

  def main(args: Array[String]) : Unit = {
    // init
    val conf = new SparkConf()
      .setAppName( "Example." )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // create persons
    val persons = Seq(Person("bob", 35), Person("alice", 27))
    val personsRDD = sc.parallelize(persons, 4)
    val personsDF = personsRDD.toDF

    writeParquet( personsDF, "persons.parquet", sc, sqlContext)

    val otherPersonDF = readParquet( "persons.parquet", sc, sqlContext )
  }

  def writeParquet(personsDF: DataFrame, path:String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.from

    val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray

    val mappedDF = personsDF.select( mapping: _* )
    mappedDF.write.parquet("/output/path.parquet") // parquet with columns "a" and "b"
  }

  def readParquet(path: String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.to
    val df = sqlContext.read.parquet(path) // this df has columns a and b

    val mapping = to.map{ (x:(String, String)) => df(x._1).as(x._2) }.toArray
    df.select( mapping: _* )
  }
}

Remark

If you need to convert a dataframe back to an RDD[Person], then

val rdd : RDD[Row] = personsDF.rdd
val personsRDD : Rdd[Person] = rdd.map { r: Row => 
  Person( r.getAs("person"), r.getAs("age") )
}

Alternatives

Have also a look at How to convert spark SchemaRDD into RDD of my case class?

like image 194
Martin Senne Avatar answered Sep 21 '22 15:09

Martin Senne