Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

More efficient way to loop through PySpark DataFrame and create new columns

I am converting some code written with Pandas to PySpark. The code has a lot of for loops to create a variable number of columns depending on user-specified inputs.

I'm using Spark 1.6.x, with the following sample code:

from pyspark.sql import SQLContext
from pyspark.sql import functions as F
import pandas as pd
import numpy as np

# create a Pandas DataFrame, then convert to Spark DataFrame
test = sqlContext.createDataFrame(pd.DataFrame({'val1': np.arange(1,11)}))

Which leaves me with

+----+
|val1|
+----+
|   1|
|   2|
|   3|
|   4|
|   5|
|   6|
|   7|
|   8|
|   9|
|  10|
+----+

I loop a lot in the code, for example the below:

for i in np.arange(2,6).tolist():
    test = test.withColumn('val_' + str(i), F.lit(i ** 2) + test.val1)

Which results in:

+----+-----+-----+-----+-----+
|val1|val_2|val_3|val_4|val_5|
+----+-----+-----+-----+-----+
|   1|    5|   10|   17|   26|
|   2|    6|   11|   18|   27|
|   3|    7|   12|   19|   28|
|   4|    8|   13|   20|   29|
|   5|    9|   14|   21|   30|
|   6|   10|   15|   22|   31|
|   7|   11|   16|   23|   32|
|   8|   12|   17|   24|   33|
|   9|   13|   18|   25|   34|
|  10|   14|   19|   26|   35|
+----+-----+-----+-----+-----+

**Question: ** How can I rewrite the above loop to be more efficient?

I've noticed that my code runs slower as Spark spends a lot of time on each group of loops (even on small datasets like 2GB of text input).

Thanks

like image 525
nevi_me Avatar asked Oct 14 '16 19:10

nevi_me


People also ask

What are two methods to create new DataFrame columns PySpark?

You can add multiple columns to PySpark DataFrame in several ways if you wanted to add a known set of columns you can easily do it by chaining withColumn() or using select() .

Is DataFrame apply faster than for loop?

apply is not faster in itself but it has advantages when used in combination with DataFrames. This depends on the content of the apply expression. If it can be executed in Cython space, apply is much faster (which is the case here). We can use apply with a Lambda function.

How do I make multiple columns in PySpark?

You can use reduce , for loops, or list comprehensions to apply PySpark functions to multiple columns in a DataFrame. Using iterators to apply the same operation on multiple columns is vital for maintaining a DRY codebase.


2 Answers

There is a small overhead of repeatedly calling JVM method but otherwise for loop alone shouldn't be a problem. You can improve it slightly by using a single select:

df = spark.range(1, 11).toDF("val1")

def make_col(i):
    return (F.pow(F.lit(i), 2) + F.col("val1")).alias("val_{0}".format(i))

spark.range(1, 11).toDF("val1").select("*", *(make_col(i) for i in range(2, 6)))

I would also avoid using NumPy types. Initializing NumPy objects is typically more expensive compared to plain Python objects and Spark SQL doesn't support NumPy types so there some additional conversions required.

like image 81
zero323 Avatar answered Nov 20 '22 22:11

zero323


One withColumn will work on entire rdd. So generally its not a good practise to use the method for every column you want to add. There is a way where you work with columns and their data inside a map function. Since one map function is doing the job here, the code to add new column and its data will be done in parallel.

a. you can gather new values based on the calculations

b. Add these new column values to main rdd as below

val newColumns: Seq[Any] = Seq(newcol1,newcol2)
Row.fromSeq(row.toSeq.init ++ newColumns)

Here row, is the reference of row in map method

c. Create new schema as below

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType))

d. Add to the old schema

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType)

e. Create new dataframe with new columns

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema)
like image 29
Ramzy Avatar answered Nov 21 '22 00:11

Ramzy