More efficient way to loop through PySpark DataFrame and create new columns

I am converting some code written with Pandas to PySpark. The code has a lot of for loops to create a variable number of columns depending on user-specified inputs.

I'm using Spark 1.6.x, with the following sample code:

from pyspark.sql import SQLContext
from pyspark.sql import functions as F
import pandas as pd
import numpy as np

# create a Pandas DataFrame, then convert to Spark DataFrame
test = sqlContext.createDataFrame(pd.DataFrame({'val1': np.arange(1,11)}))

Which leaves me with

|   1|
|   2|
|   3|
|   4|
|   5|
|   6|
|   7|
|   8|
|   9|
|  10|

I loop a lot in the code, for example the below:

for i in np.arange(2,6).tolist():
    test = test.withColumn('val_' + str(i), F.lit(i ** 2) + test.val1)

Which results in:

|   1|    5|   10|   17|   26|
|   2|    6|   11|   18|   27|
|   3|    7|   12|   19|   28|
|   4|    8|   13|   20|   29|
|   5|    9|   14|   21|   30|
|   6|   10|   15|   22|   31|
|   7|   11|   16|   23|   32|
|   8|   12|   17|   24|   33|
|   9|   13|   18|   25|   34|
|  10|   14|   19|   26|   35|

**Question: ** How can I rewrite the above loop to be more efficient?

I've noticed that my code runs slower as Spark spends a lot of time on each group of loops (even on small datasets like 2GB of text input).


2 Answers

There is a small overhead of repeatedly calling JVM method but otherwise for loop alone shouldn't be a problem. You can improve it slightly by using a single select:

df = spark.range(1, 11).toDF("val1")

def make_col(i):
    return (F.pow(F.lit(i), 2) + F.col("val1")).alias("val_{0}".format(i))

spark.range(1, 11).toDF("val1").select("*", *(make_col(i) for i in range(2, 6)))

I would also avoid using NumPy types. Initializing NumPy objects is typically more expensive compared to plain Python objects and Spark SQL doesn't support NumPy types so there some additional conversions required.

One withColumn will work on entire rdd. So generally its not a good practise to use the method for every column you want to add. There is a way where you work with columns and their data inside a map function. Since one map function is doing the job here, the code to add new column and its data will be done in parallel.

a. you can gather new values based on the calculations

b. Add these new column values to main rdd as below

val newColumns: Seq[Any] = Seq(newcol1,newcol2)
Row.fromSeq(row.toSeq.init ++ newColumns)

Here row, is the reference of row in map method

c. Create new schema as below

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType))

d. Add to the old schema

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType)

e. Create new dataframe with new columns

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema)
