Sometimes (e.g. for testing and bechmarking) I want force the execution of the transformations defined on a DataFrame. AFAIK calling an action like count
does not ensure that all Columns
are actually computed, show
may only compute a subset of all Rows
(see examples below)
My solution is to write the DataFrame
to HDFS using df.write.saveAsTable
, but this "clutters" my system with tables I don't want to keep any further.
So what is the best way to trigger the evaluation of a DataFrame
?
Edit:
Note that there is also a recent discussion on the spark developer list : http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-td21018.html
I made a small example which shows that count
on DataFrame
does not evaluate everything (tested using Spark 1.6.3 and spark-master = local[2]
):
val df = sc.parallelize(Seq(1)).toDF("id")
val myUDF = udf((i:Int) => {throw new RuntimeException;i})
df.withColumn("test",myUDF($"id")).count // runs fine
df.withColumn("test",myUDF($"id")).show() // gives Exception
Using the same logic, here an example that show
does not evaluate all rows:
val df = sc.parallelize(1 to 10).toDF("id")
val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i})
df.withColumn("test",myUDF($"id")).show(5) // runs fine
df.withColumn("test",myUDF($"id")).show(10) // gives Exception
Edit 2 : For Eliasah: The Exception says this:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): java.lang.RuntimeException
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(<console>:68)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
.
.
.
.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1376)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1375)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1457)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
.
.
.
.
Lazy Evaluation in Sparks means Spark will not start the execution of the process until an ACTION is called. We all know from previous lessons that Spark consists of TRANSFORMATIONS and ACTIONS. Until we are doing only transformations on the dataframe/dataset/RDD, Spark is the least concerned.
Although most things in Spark SQL are executed lazily, Commands evaluate eagerly. It means that Apache Spark executes them as soon as you define them in your pipeline, e.g. using sql() method.
To get the schema of the Spark DataFrame, use printSchema() on Spark DataFrame object. From the above example, printSchema() prints the schema to console( stdout ) and show() displays the content of the Spark DataFrame.
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache ) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.
It's a bit late, but here's the fundamental reason: count
does not act the same on RDD
and DataFrame
.
In DataFrame
s there's an optimization, as in some cases you do not require to load data to actually know the number of elements it has (especially in the case of yours where there's no data shuffling involved). Hence, the DataFrame
materialized when count
is called will not load any data and will not pass into your exception throwing. You can easily do the experiment by defining your own DefaultSource
and Relation
and see that calling count
on a DataFrame
will always end up in the method buildScan
with no requiredColumns
no matter how many columns you did select (cf. org.apache.spark.sql.sources.interfaces
to understand more). It's actually a very efficient optimization ;-)
In RDD
s though, there's no such optimizations (that's why one should always try to use DataFrame
s when possible). Hence the count
on RDD
executes all the lineage and returns the sum of all sizes of the iterators composing any partitions.
Calling dataframe.count
goes into the first explanation, but calling dataframe.rdd.count
goes into the second as you did build an RDD
out of your DataFrame
. Note that calling dataframe.cache().count
forces the dataframe
to be materialized as you required Spark to cache the results (hence it needs to load all the data and transform it). But it does have the side-effect of caching your data...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With