Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why would one use DataFrame.select over DataFrame.rdd.map (or vice versa)?

Is there any "mechanical" difference between using select on a DataFrame to pick up information we need and mapping each row of the underlying RDD for the same purpose?

By "mechanical" I am referring to the the mechanism that performs the operations. Implementation details, in other words.

Which of offered two is better/more performant?

df = # create dataframe ...
df.select("col1", "col2", ...)

or

df = # create dataframe ...
df.rdd.map(lambda row: (row[0], row[1], ...))

I am in the middle of performance testing, so I am going to find out which is faster but I would like to know what are implementation differences and pros/cons.

like image 443
ezamur Avatar asked Oct 18 '22 21:10

ezamur


2 Answers

RDD is just a graph lineage of transformations and action.

A DataFrame has a logical plan that is internally optimized by the Catalyst logical query optimizer before execution of action.

What it means in your case?

If you have DataFrame then you should use select - any additional work like filtering, joining, etc., will be optimized. Optimized DataFrame can be 10-times faster than plain RDD. In other words, before executing select Spark will try to make query faster. This will not be done when using dataFrame.rdd.map()

One more: rdd value is calculated lazily by doing:

lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
    sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }

So Spark will use it's RDD, map and cast content. DAG of both versions will be almost the same in query like in question, so performance will be similar. However in more advanced cases benefits of using Datasets will be very visible, as Spark PMCs wrote on Databricks blog, the Datasets can be even 100 times quicker after optimisation by Catalyst

Be aware, that DataFrame=Dataset[Row] and it uses RDD in the background - but the graph of RDDs is created after optimization

Note : Spark is unifying API. Spark ML is now DataFrame centric, old API should not be used. Streaming is moving to Structured Streaming. So even if you will not have much performance improvement in your case, consider using DataFrames. It would be better decision for future development and of course will be faster than using plain RDD

like image 93
T. Gawęda Avatar answered Oct 21 '22 09:10

T. Gawęda


In this oversimplified example with DataFrame.select and DataFrame.rdd.map I think the difference might be almost negligible.

After all you've loaded your data set already and only do projection. Eventually both would have to deserialize the data from Spark's InternalRow columnar format to calculate the result for an action.

You can check what happens with DataFrame.select by explain(extended = true) where you will learn about the physical plans (and a physical plan, too).

scala> spark.version
res4: String = 2.1.0-SNAPSHOT

scala> spark.range(5).select('id).explain(extended = true)
== Parsed Logical Plan ==
'Project [unresolvedalias('id, None)]
+- Range (0, 5, step=1, splits=Some(4))

== Analyzed Logical Plan ==
id: bigint
Project [id#17L]
+- Range (0, 5, step=1, splits=Some(4))

== Optimized Logical Plan ==
Range (0, 5, step=1, splits=Some(4))

== Physical Plan ==
*Range (0, 5, step=1, splits=Some(4))

Compare the physical plan (i.e. SparkPlan) to what you're doing with rdd.map (by toDebugString) and you'll know what might be "better".

scala> spark.range(5).rdd.toDebugString
res5: String =
(4) MapPartitionsRDD[8] at rdd at <console>:24 []
 |  MapPartitionsRDD[7] at rdd at <console>:24 []
 |  MapPartitionsRDD[6] at rdd at <console>:24 []
 |  MapPartitionsRDD[5] at rdd at <console>:24 []
 |  ParallelCollectionRDD[4] at rdd at <console>:24 []

(again in this contrived example I think that there is no winner -- both are as efficient as possible).

Please note that DataFrame is really a Dataset[Row] which uses RowEncoder to encode (i.e. serialize) the data into an InternalRow columnar binary format. If you were to execute more operators in a pipeline, you could get much better performance with sticking to Dataset than RDD just because the low-level behind-the-scenes logical query plan optimizations and the columnar binary format.

There are a lot of optimizations and trying to beat them might often result in a waste of your time. You'd have to know the Spark internals by heart to get better performance (and the price would certainly be readability).

There's much to it and I'd strongly recommend watching the talk A Deep Dive into the Catalyst Optimizer by Herman van Hovell to know and appreciate all the optimizations.

My take on it is..."Stay away from RDDs unless you know what you're doing".

like image 26
Jacek Laskowski Avatar answered Oct 21 '22 07:10

Jacek Laskowski