I have the following code:
val df_in = sqlcontext.read.json(jsonFile) // the file resides in hdfs
//some operations in here to create df as df_in with two more columns "terms1" and "terms2"
val intersectUDF = udf( (seq1:Seq[String], seq2:Seq[String] ) => { seq1 intersect seq2 } ) //intersects two sequences
val symmDiffUDF = udf( (seq1:Seq[String], seq2:Seq[String] ) => { (seq1 diff seq2) ++ (seq2 diff seq1) } ) //compute the difference of two sequences
val df1 = (df.withColumn("termsInt", intersectUDF(df("terms1"), df1("terms2") ) )
.withColumn("termsDiff", symmDiffUDF(df("terms1"), df1("terms2") ) )
.where( size(col("termsInt")) >0 && size(col("termsDiff")) > 0 && size(col("termsDiff")) <= 2 )
.cache()
) // add the intersection and difference columns and filter the resulting DF
df1.show()
df1.count()
The app is working properly and fast until the show()
but in the count()
step, it creates 40000 tasks.
My understanding is that df1.show()
should be triggering the full df1
creation and df1.count()
should be very fast. What am I missing here? why is count()
that slow?
Thank you very much in advance, Roxana
show is indeed an action, but it is smart enough to know when it doesn't have to run everything. If you had an orderBy it would take very long too, but in this case all your operations are map operations and so there's no need to calculate the whole final table.
Some typical actions are: count() — computes the number of rows in the dataset that is represented by the DataFrame. show() — prints on the screen 20 records from the dataset that is represented by the DataFrame. collect() — prints all the records on the screen.
Actions are RDD's operation, that value returns back to the spar driver programs, which kick off a job to execute on a cluster. Transformation's output is an input of Actions. reduce, collect, takeSample, take, first, saveAsTextfile, saveAsSequenceFile, countByKey, foreach are common actions in Apache spark.
One of them is when you state infer schema , then as that can take a long time, underwater Spark already launches a Job to do the schema inferring. It's that simple. It's something that is in the optimization & performance aspect and cannot be seen as Action or Transformation.
show
is indeed an action, but it is smart enough to know when it doesn't have to run everything. If you had an orderBy
it would take very long too, but in this case all your operations are map operations and so there's no need to calculate the whole final table. However, count
needs to physically go through the whole table in order to count it and that's why it's taking so long. You could test what I'm saying by adding an orderBy
to df1
's definition - then it should take long.
EDIT: Also, the 40k tasks are likely due to the amount of partitions your DF is partitioned into. Try using df1.repartition(<a sensible number here, depending on cluster and DF size>)
and trying out count again.
show()
by default shows only 20 rows. If the 1st partition returned more than 20 rows, then the rest partitions will not be executed.
Note show
has a lot of variations. If you run show(false)
which means show all results, all partitions will be executed and may take more time. So, show()
equals show(20)
which is a partial action.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With