Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Copy schema from one dataframe to another dataframe

I'm trying to change the schema of an existing dataframe to the schema of another dataframe.

DataFrame 1:

Column A | Column B | Column C | Column D
   "a"   |    1     |   2.0    |   300
   "b"   |    2     |   3.0    |   400
   "c"   |    3     |   4.0    |   500

DataFrame 2:

Column K | Column B | Column F
   "c"   |    4     |   5.0
   "b"   |    5     |   6.0
   "f"   |    6     |   7.0

So I want to apply the schema of the first dataframe on the second. So all the columns which are the same remain. The columns in dataframe 2 that are not in 1 get deleted. The others become "NULL".

Output

Column A | Column B | Column C | Column D
 "NULL"  |    4     |   "NULL" |  "NULL"
 "NULL"  |    5     |   "NULL" |  "NULL"
 "NULL"  |    6     |   "NULL" |  "NULL"

So I came with a possible solution:

val schema = df1.schema
val newRows: RDD[Row] = df2.map(row => {
  val values = row.schema.fields.map(s => {
    if(schema.fields.contains(s)){
      row.getAs(s.name).toString
    }else{
      "NULL"
    }
  })
  Row.fromSeq(values)
})
sqlContext.createDataFrame(newRows, schema)}

Now as you can see this will not work because the schema contains String, Int and Double. And all my rows have String values.

This is where I'm stuck, is there a way to automatically convert the type of my values to the schema?

like image 346
RudyVerboven Avatar asked Apr 22 '16 13:04

RudyVerboven


2 Answers

If schema is flat I would use simply map over per-existing schema and select required columns:

val exprs = df1.schema.fields.map { f => 
  if (df2.schema.fields.contains(f)) col(f.name)
  else lit(null).cast(f.dataType).alias(f.name) 
}

df2.select(exprs: _*).printSchema

// root
//  |-- A: string (nullable = true)
//  |-- B: integer (nullable = false)
//  |-- C: double (nullable = true)
//  |-- D: integer (nullable = true)
like image 146
zero323 Avatar answered Sep 22 '22 02:09

zero323


Working in 2018 (Spark 2.3) reading a .sas7bdat

Scala

val sasFile = "file.sas7bdat"
val dfSas = spark.sqlContext.sasFile(sasFile)
val myManualSchema = dfSas.schema //getting the schema from another dataframe
val df = spark.read.format("csv").option("header","true").schema(myManualSchema).load(csvFile)

PD: spark.sqlContext.sasFile use saurfang library, you could skip that part of code and get the schema from another dataframe.

like image 28
Antonio Cachuan Avatar answered Sep 21 '22 02:09

Antonio Cachuan