From the Spark source code:
/**
* Represents the content of the Dataset as an `RDD` of `T`.
*
* @group basic
* @since 1.6.0
*/
lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
rddQueryExecution.toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType).asInstanceOf[T])
}
}
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972
The mapPartitions
can take as long as the time to compute the RDD
in the first place.. So this makes operations such as
df.rdd.getNumPartitions
very expensive. Given that a DataFrame
is DataSet[Row]
and a DataSet
is composed of RDD
's why is a re-mapping required? Any insights appreciated.
TL;DR That's because the internal RDD
is not RDD[Row]
.
Given that a DataFrame is
DataSet[Row]
and aDataSet
is composed of RDD's
That's a huge oversimplification. First of all DataSet[T]
doesn't mean that you interact with container of T
. It means that if you use collection-like API (often referred as strongly typed), internal representation will be decoded into T
.
The internal representation is a binary format used internally by Tungsten.This representation is internal and subject of changes and far too low level to be used in practice.
An intermediate representation, which exposes this data is the InternalRow
- rddQueryExecution.toRDD
is in fact RDD[InternalRow]
. This representation (there are different implementation) still exposes the internal types, is consider "weakly" private, as all objects in o.a.s.sql.catalyst
(the access is not explicitly restricted, but API is not documented), and rather tricky to interact with.
This where decoding comes into play and why you need full "re-mapping" - to convert internal, often unsafe, objects into external types intended for public usage.
Finally, to reiterate my previous statement - the code in question won't be executed when getNumPartitions
is called.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With