Without deprecation warnings in spark SQL 1.2.1, the following code stopped working in 1.3
Worked in 1.2.1 (without any deprecation warnings)
val sqlContext = new HiveContext(sc)
import sqlContext._
val jsonRDD = sqlContext.jsonFile(jsonFilePath)
jsonRDD.registerTempTable("jsonTable")
val jsonResult = sql(s"select * from jsonTable")
val foo = jsonResult.zipWithUniqueId().map {
case (Row(...), uniqueId) => // do something useful
...
}
foo.registerTempTable("...")
Stopped working in 1.3.0 (simply does not compile, and all I did was change to 1.3)
jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method
Not working workaround:
although this might give me an RDD[Row]:
jsonResult.rdd.zipWithUniqueId()
now this won't work as RDD[Row]
does not have a registerTempTable
method of course
foo.registerTempTable("...")
Here are my questions
It is not a bug, but sorry for the confusion! Up until Spark 1.3, Spark SQL was labeled an Alpha Component as the APIs were still in flux. With Spark 1.3 we graduated and stabilized the API. A full description of what you need to do when porting can be found in the documentation.
I can also answer your specific questions and give some justification about why we made these changes
Stopped working in 1.3.0 (simply does not compile, and all I did was change to 1.3)
jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method
DataFrames are now one unified interface across both Scala and Java. However, since we must maintain compatibility with the existing RDD API for the rest of 1.X, DataFrames
are not RDD
s. To get the RDD representation you can call df.rdd
or df.javaRDD
Additionally, because we were afraid of some of the confusion that can happen with implicit conversions, we made it such that you must explicitly call rdd.toDF
to cause the conversion from RDD to occur. However, this conversion only works automatically if your RDD holds objects that inherit from Product
(i.e. tuples or case classes).
Back to the original question, if you want do to transformations on rows with arbitrary schema you will need to explicitly tell Spark SQL about the structure of the data after your map operation (since the compiler cannot).
import org.apache.spark.sql.types._
val jsonData = sqlContext.jsonRDD(sc.parallelize("""{"name": "Michael", "zip": 94709}""" :: Nil))
val newSchema =
StructType(
StructField("uniqueId", IntegerType) +: jsonData.schema.fields)
val augmentedRows = jsonData.rdd.zipWithUniqueId.map {
case (row, id) =>
Row.fromSeq(id +: row.toSeq)
}
val newDF = sqlContext.createDataFrame(augmentedRows, newSchema)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With