Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert spark SchemaRDD into RDD of my case class?

In the spark docs it's clear how to create parquet files from RDD of your own case classes; (from the docs)

val people: RDD[Person] = ??? // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

But not clear how to convert back, really we want a method readParquetFile where we can do:

val people: RDD[Person] = sc.readParquestFile[Person](path)

where those values of the case class are defined are those which are read by the method.

like image 441
samthebest Avatar asked Oct 03 '14 14:10

samthebest


2 Answers

An easy way is to provide your own converter (Row) => CaseClass. This is a bit more manual, but if you know what you are reading it should be quite straightforward.

Here is an example:

import org.apache.spark.sql.SchemaRDD

case class User(data: String, name: String, id: Long)

def sparkSqlToUser(r: Row): Option[User] = {
    r match {
      case Row(time: String, name: String, id: Long) => Some(User(time,name, id))
      case _ => None
    }
}

val parquetData: SchemaRDD = sqlContext.parquetFile("hdfs://localhost/user/data.parquet")

val caseClassRdd: org.apache.spark.rdd.RDD[User] = parquetData.flatMap(sparkSqlToUser)
like image 53
marios Avatar answered Sep 21 '22 15:09

marios


The best solution I've come up with that requires the least amount of copy and pasting for new classes is as follows (I'd still like to see another solution though)

First you have to define your case class, and a (partially) reusable factory method

import org.apache.spark.sql.catalyst.expressions

case class MyClass(fooBar: Long, fred: Long)

// Here you want to auto gen these functions using macros or something
object Factories extends java.io.Serializable {
  def longLong[T](fac: (Long, Long) => T)(row: expressions.Row): T = 
    fac(row(0).asInstanceOf[Long], row(1).asInstanceOf[Long])
}

Some boiler plate which will already be available

import scala.reflect.runtime.universe._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD

The magic

import scala.reflect.ClassTag
import org.apache.spark.sql.SchemaRDD

def camelToUnderscores(name: String) = 
  "[A-Z]".r.replaceAllIn(name, "_" + _.group(0).toLowerCase())

def getCaseMethods[T: TypeTag]: List[String] = typeOf[T].members.sorted.collect {
  case m: MethodSymbol if m.isCaseAccessor => m
}.toList.map(_.toString)

def caseClassToSQLCols[T: TypeTag]: List[String] = 
  getCaseMethods[T].map(_.split(" ")(1)).map(camelToUnderscores)

def schemaRDDToRDD[T: TypeTag: ClassTag](schemaRDD: SchemaRDD, fac: expressions.Row => T) = {
  val tmpName = "tmpTableName" // Maybe should use a random string
  schemaRDD.registerAsTable(tmpName)
  sqlContext.sql("SELECT " + caseClassToSQLCols[T].mkString(", ") + " FROM " + tmpName)
  .map(fac)
}

Example use

val parquetFile = sqlContext.parquetFile(path)

val normalRDD: RDD[MyClass] = 
  schemaRDDToRDD[MyClass](parquetFile, Factories.longLong[MyClass](MyClass.apply))

See also:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert-SchemaRDD-back-to-RDD-td9071.html

Though I failed to find any example or documentation by following the JIRA link.

like image 32
samthebest Avatar answered Sep 22 '22 15:09

samthebest