From the Spark source code:
/**
* Represents the content of the Dataset as an `RDD` of `T`.
*
* @group basic
* @since 1.6.0
*/
lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
rddQueryExecution.toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType).asInstanceOf[T])
}
}
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972
The mapPartitions can take as long as the time to compute the RDD in the first place.. So this makes operations such as
df.rdd.getNumPartitions
very expensive. Given that a DataFrame is DataSet[Row] and a DataSet is composed of RDD's why is a re-mapping required? Any insights appreciated.
TL;DR That's because the internal RDD is not RDD[Row].
Given that a DataFrame is
DataSet[Row]and aDataSetis composed of RDD's
That's a huge oversimplification. First of all DataSet[T] doesn't mean that you interact with container of T. It means that if you use collection-like API (often referred as strongly typed), internal representation will be decoded into T.
The internal representation is a binary format used internally by Tungsten.This representation is internal and subject of changes and far too low level to be used in practice.
An intermediate representation, which exposes this data is the InternalRow - rddQueryExecution.toRDD is in fact RDD[InternalRow]. This representation (there are different implementation) still exposes the internal types, is consider "weakly" private, as all objects in o.a.s.sql.catalyst (the access is not explicitly restricted, but API is not documented), and rather tricky to interact with.
This where decoding comes into play and why you need full "re-mapping" - to convert internal, often unsafe, objects into external types intended for public usage.
Finally, to reiterate my previous statement - the code in question won't be executed when getNumPartitions is called.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With