Does Spark do one or multiple passes through data when multiple withColumn
functions are chained?
For example:
val dfnew = df.withColumn("newCol1", f1(col("a")))
.withColumn("newCol2", f2(col("b")))
.withColumn("newCol3", f3(col("c")))
where
df
is my input DataFrame
containing at least columns a, b, cdfnew
is output DataFrame
with three new columns newCol1, newCol2, newCol3f1
, f2
, f3
are some user defined functions or some spark operations on columns like cast, etc In my project I can have even 30 independent withColumn
function chained with foldLeft
.Important
I am assuming here that f2
does not depend on result of f1
, while f3
does not depend on result of f1
and f2
. The functions could be performed in any order. There is no shuffle in any function
My observations
withColumn
does not increase execution time in such a way to suspect additional passages through data.SQLTransformer
with select statement containing all functions vs multiple separate SQLTransformer
one for each function and the execution time was similar. Questions
withColumn
?f1
, f2
, f3
? UDF vs generic Spark operations?f1
, f2
, f3
are inside the same stage, does it mean they are in the same data pass?withColumn
functions with foldLeft
will it change number of passages?SQLTransformers
or just one SQLTransformer
with all three transformations in the same select_statement. How many passes through data that would do?Spark ingests the CSV file in a distributed way. The file must be on a shared drive, distributed file system, or shared via a shared file system mechanism like Dropbox, Box, Nextcloud/Owncloud, etc. In this context, a partition is a dedicated area in the worker's memory. Dataset<Row> df = spark.
Returns a new DataFrame by adding a column or replacing the existing column that has the same name. The column expression must be an expression over this DataFrame ; attempting to add a column from some other DataFrame will raise an error. New in version 1.3.
The with Column operation works on selected rows or all of the rows column value. This returns a new Data Frame post performing the operation. It is a transformation function that executes only post-action call over PySpark Data Frame.
Using Spark, you can aggregate any kind of value into a set, list, etc. We will see this in “Aggregating to Complex Types”. We have some categories in aggregations. The simplest grouping is to get a summary of a given data frame by using an aggregation function in a select statement.
Will spark make one or three passages through the data, once for each withColumn?
Spark will "make one passage" through the data. Why? Because spark doesn't actually do anything when this code is reached, it just builds an execution plan which would tell it what to do when dfnew
is used (i.e. some action, e.g. count
, collect
, write
etc.) is executed on it. Then, it would be able to compute all functions at once for each record.
Does it depend on the type of functions f1, f2, f3? UDF vs generic Spark operations?
No.
If the functions f1, f2, f3 are inside the same stage, does it mean they are in the same data pass?
Yes.
Does number of passages depend on shuffles within functions? If there is no shuffle?
Almost. First, as long as no caching / checkpointing is used, the number of passages over the data will be the number of actions executed on the resulting newdf
DataFrame. Then, each shuffle means each record is read, potentially sent between worker nodes, potentially written to disk, and then read again.
If I chain the withColumn functions with foldLeft will it change number of passages?
No. It will only change the way the above-mentioned plan is constructed, but it will have no effect on how this plan looks (would be the exact same plan), so the computation will remain the same.
I could do something similar with three SQLTransformers or just one SQLTransformer with all three transformations in the same select_statement. How many passes through data that would do?
Again, this won't make any difference, as the execution plan will remain the same.
Basically it doesn't matter, the time of execution will be similar for 1 and 3 passages?
Not sure what this means, but sounds like this is not correct: the time of execution is mostly a factor of number of shuffles and number of actions (assuming same data and same cluster setup).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With