Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does Spark do one pass through the data for multiple withColumn?

Does Spark do one or multiple passes through data when multiple withColumn functions are chained?

For example:

val dfnew = df.withColumn("newCol1", f1(col("a")))
              .withColumn("newCol2", f2(col("b")))
              .withColumn("newCol3", f3(col("c")))

where

  • df is my input DataFrame containing at least columns a, b, c
  • dfnew is output DataFrame with three new columns newCol1, newCol2, newCol3
  • f1, f2, f3 are some user defined functions or some spark operations on columns like cast, etc In my project I can have even 30 independent withColumn function chained with foldLeft.

Important

I am assuming here that f2 does not depend on result of f1, while f3 does not depend on result of f1 and f2. The functions could be performed in any order. There is no shuffle in any function

My observations

  • all functions are in the same stage
  • addition of new withColumn does not increase execution time in such a way to suspect additional passages through data.
  • I have tested for example single SQLTransformer with select statement containing all functions vs multiple separate SQLTransformer one for each function and the execution time was similar.

Questions

  • Will spark make one or three passages through the data, once for each withColumn?
  • Does it depend on the type of functions f1, f2, f3? UDF vs generic Spark operations?
  • If the functions f1, f2, f3 are inside the same stage, does it mean they are in the same data pass?
  • Does number of passages depend on shuffles within functions? If there is no shuffle?
  • If I chain the withColumn functions with foldLeft will it change number of passages?
  • I could do something similar with three SQLTransformers or just one SQLTransformer with all three transformations in the same select_statement. How many passes through data that would do?
  • Basically it doesn't matter, the time of execution will be similar for 1 and 3 passages?
like image 847
astro_asz Avatar asked Dec 18 '17 15:12

astro_asz


People also ask

How does Spark transfer data?

Spark ingests the CSV file in a distributed way. The file must be on a shared drive, distributed file system, or shared via a shared file system mechanism like Dropbox, Box, Nextcloud/Owncloud, etc. In this context, a partition is a dedicated area in the worker's memory. Dataset<Row> df = spark.

What does withColumn do in Spark?

Returns a new DataFrame by adding a column or replacing the existing column that has the same name. The column expression must be an expression over this DataFrame ; attempting to add a column from some other DataFrame will raise an error. New in version 1.3.

How does withColumn work PySpark?

The with Column operation works on selected rows or all of the rows column value. This returns a new Data Frame post performing the operation. It is a transformation function that executes only post-action call over PySpark Data Frame.

How does Spark aggregate data?

Using Spark, you can aggregate any kind of value into a set, list, etc. We will see this in “Aggregating to Complex Types”. We have some categories in aggregations. The simplest grouping is to get a summary of a given data frame by using an aggregation function in a select statement.


1 Answers

Will spark make one or three passages through the data, once for each withColumn?

Spark will "make one passage" through the data. Why? Because spark doesn't actually do anything when this code is reached, it just builds an execution plan which would tell it what to do when dfnew is used (i.e. some action, e.g. count, collect, write etc.) is executed on it. Then, it would be able to compute all functions at once for each record.

Does it depend on the type of functions f1, f2, f3? UDF vs generic Spark operations?

No.

If the functions f1, f2, f3 are inside the same stage, does it mean they are in the same data pass?

Yes.

Does number of passages depend on shuffles within functions? If there is no shuffle?

Almost. First, as long as no caching / checkpointing is used, the number of passages over the data will be the number of actions executed on the resulting newdf DataFrame. Then, each shuffle means each record is read, potentially sent between worker nodes, potentially written to disk, and then read again.

If I chain the withColumn functions with foldLeft will it change number of passages?

No. It will only change the way the above-mentioned plan is constructed, but it will have no effect on how this plan looks (would be the exact same plan), so the computation will remain the same.

I could do something similar with three SQLTransformers or just one SQLTransformer with all three transformations in the same select_statement. How many passes through data that would do?

Again, this won't make any difference, as the execution plan will remain the same.

Basically it doesn't matter, the time of execution will be similar for 1 and 3 passages?

Not sure what this means, but sounds like this is not correct: the time of execution is mostly a factor of number of shuffles and number of actions (assuming same data and same cluster setup).

like image 164
Tzach Zohar Avatar answered Oct 12 '22 00:10

Tzach Zohar