Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is this a regression bug in Spark 1.3?

Without deprecation warnings in spark SQL 1.2.1, the following code stopped working in 1.3

Worked in 1.2.1 (without any deprecation warnings)

 val sqlContext = new HiveContext(sc)
 import sqlContext._
 val jsonRDD = sqlContext.jsonFile(jsonFilePath)
 jsonRDD.registerTempTable("jsonTable")

 val jsonResult = sql(s"select * from jsonTable")
 val foo = jsonResult.zipWithUniqueId().map {
   case (Row(...), uniqueId) => // do something useful
   ...
 }

 foo.registerTempTable("...")

Stopped working in 1.3.0 (simply does not compile, and all I did was change to 1.3)

jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method

Not working workaround:

although this might give me an RDD[Row]:

jsonResult.rdd.zipWithUniqueId()  

now this won't work as RDD[Row] does not have a registerTempTable method of course

     foo.registerTempTable("...")

Here are my questions

  1. Is there a workaround? (e.g. am I simply doing it wrong?)
  2. Is this a bug? (I think that anything that stops compiling that worked in a previous version, without a @deprecated warning is clearly a regression bug)
like image 508
Eran Medan Avatar asked Mar 24 '15 21:03

Eran Medan


1 Answers

It is not a bug, but sorry for the confusion! Up until Spark 1.3, Spark SQL was labeled an Alpha Component as the APIs were still in flux. With Spark 1.3 we graduated and stabilized the API. A full description of what you need to do when porting can be found in the documentation.

I can also answer your specific questions and give some justification about why we made these changes

Stopped working in 1.3.0 (simply does not compile, and all I did was change to 1.3) jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method

DataFrames are now one unified interface across both Scala and Java. However, since we must maintain compatibility with the existing RDD API for the rest of 1.X, DataFrames are not RDDs. To get the RDD representation you can call df.rdd or df.javaRDD

Additionally, because we were afraid of some of the confusion that can happen with implicit conversions, we made it such that you must explicitly call rdd.toDF to cause the conversion from RDD to occur. However, this conversion only works automatically if your RDD holds objects that inherit from Product (i.e. tuples or case classes).

Back to the original question, if you want do to transformations on rows with arbitrary schema you will need to explicitly tell Spark SQL about the structure of the data after your map operation (since the compiler cannot).

import org.apache.spark.sql.types._
val jsonData = sqlContext.jsonRDD(sc.parallelize("""{"name": "Michael", "zip": 94709}""" :: Nil))
val newSchema = 
  StructType(
    StructField("uniqueId", IntegerType) +: jsonData.schema.fields)

val augmentedRows = jsonData.rdd.zipWithUniqueId.map { 
  case (row, id) =>
    Row.fromSeq(id +: row.toSeq)
}

val newDF = sqlContext.createDataFrame(augmentedRows, newSchema)
like image 54
Michael Armbrust Avatar answered Nov 02 '22 11:11

Michael Armbrust