Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Spark/Scala compiler fail to find toDF on RDD[Map[Int, Int]]?

Why does the following end up with an error?

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> val rdd = sc.parallelize(1 to 10).map(x => (Map(x  -> 0), 0))
rdd: org.apache.spark.rdd.RDD[(scala.collection.immutable.Map[Int,Int], Int)] = MapPartitionsRDD[20] at map at <console>:27

scala> rdd.toDF
res8: org.apache.spark.sql.DataFrame = [_1: map<int,int>, _2: int]

scala> val rdd = sc.parallelize(1 to 10).map(x => Map(x  -> 0))
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[Int,Int]] = MapPartitionsRDD[23] at map at <console>:27

scala> rdd.toDF
<console>:30: error: value toDF is not a member of org.apache.spark.rdd.RDD[scala.collection.immutable.Map[Int,Int]]
              rdd.toDF

So what exactly is happening here, toDF can convert RDD of type (scala.collection.immutable.Map[Int,Int], Int) to DataFrame but not of type scala.collection.immutable.Map[Int,Int]. Why is that?

like image 916
Pravin Gadakh Avatar asked Sep 11 '15 11:09

Pravin Gadakh


2 Answers

For the same reason why you cannot use

sqlContext.createDataFrame(1 to 10).map(x => Map(x  -> 0))

If you take a look at the org.apache.spark.sql.SQLContext source you'll find two different implementations of the createDataFrame method:

def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame  

and

def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame 

As you can see both require A to be a subclass of Product. When you call toDF on a RDD[(Map[Int,Int], Int)] it works because Tuple2 is indeed a Product. Map[Int,Int] by itself is not hence the error.

You can make it work by wrapping Map with Tuple1:

sc.parallelize(1 to 10).map(x => Tuple1(Map(x  -> 0))).toDF
like image 54
zero323 Avatar answered Oct 13 '22 22:10

zero323


Basically because there is no implicit to create a DataFrame for a Map inside an RDD.

In you first example you are returning a Tuple, which is a Product for which there is an implicit conversion.

rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A])

In the second example you use have a Map in your RDD, for which there is no implicit conversion.

like image 5
Patrick McGloin Avatar answered Oct 14 '22 00:10

Patrick McGloin