While my first instinct is to use DataFrames
for everything, it's just not possible -- some operations are clearly easier and / or better performing as RDD
operations, not to mention certain APIs like GraphX
only work on RDDs
.
I seem to be spending a lot of time these days converting back and forth between DataFrames
and RDDs
-- so what's the performance hit? Take RDD.checkpoint
-- there's no DataFrame
equivalent, so what happens under the hood when I do:
val df = Seq((1,2),(3,4)).toDF("key","value")
val rdd = df.rdd.map(...)
val newDf = rdd.map(r => (r.getInt(0), r.getInt(1))).toDF("key","value")
Obviously, this is a trivally small example, but it would be great to know what happens behind the scene in the conversion.
RDD – RDD API is slower to perform simple grouping and aggregation operations. DataFrame – DataFrame API is very easy to use. It is faster for exploratory analysis, creating aggregated statistics on large data sets.
While RDD offers low-level control over data, Dataset and DataFrame APIs bring structure and high-level abstractions. Keep in mind that transformations from an RDD to a Dataset or DataFrame are easy to execute.
Spark RDD to DataFrame However, Spark DataFrame resolved this issue as it is equipped with the concept of schema that is used to describe data which in turn reduces the burden and improves the performance. A DataFrame can be described as a collection of distributed data that is organized as named columns.
The next reason to not use RDD is the API it provides. Most of the operations like counting, grouping, etc. are pretty straightforward and easy-to-use APIs functions are built-in. But when it comes to operations like aggregation or finding averages, it becomes really hard to code using RDD.
Let's look at df.rdd
first. This is defined as:
lazy val rdd: RDD[Row] = {
// use a local variable to make sure the map closure doesn't capture the whole DataFrame
val schema = this.schema
queryExecution.toRdd.mapPartitions { rows =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row])
}
}
So firstly, it runs queryExecution.toRdd
, which basically prepares the execution plan based on the operators used to build up the DataFrame, and computes an RDD[InternalRow]
that represents the outcome of plan.
Next these InternalRow
s (which are only for internal use) of that RDD will be mapped to normal Row
s. This will entail the following for each row:
override def toScala(row: InternalRow): Row = {
if (row == null) {
null
} else {
val ar = new Array[Any](row.numFields)
var idx = 0
while (idx < row.numFields) {
ar(idx) = converters(idx).toScala(row, idx)
idx += 1
}
new GenericRowWithSchema(ar, structType)
}
}
So it loops over all elements, coverts them to 'scala' space (from Catalyst space), and creates the final row with them. toDf
will pretty much do these things in reverse.
This all will indeed have some impact on your performance. How much depends on how complex these operations are compared to the things you do with the data. The bigger possible impact however will be that Spark's Catalyst optimizer can only optimize the operations between the conversions to and from RDDs, rather than optimize the full execution plan in its whole. It would be interesting to see which operations you have trouble with, I find most things can be done using basic expressions or UDFs. Using modules that only work on RDDs is a very valid use case though!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With