I am converting some code written with Pandas to PySpark. The code has a lot of for
loops to create a variable number of columns depending on user-specified inputs.
I'm using Spark 1.6.x, with the following sample code:
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
# create a Pandas DataFrame, then convert to Spark DataFrame
test = sqlContext.createDataFrame(pd.DataFrame({'val1': np.arange(1,11)}))
Which leaves me with
+----+
|val1|
+----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+----+
I loop a lot in the code, for example the below:
for i in np.arange(2,6).tolist():
test = test.withColumn('val_' + str(i), F.lit(i ** 2) + test.val1)
Which results in:
+----+-----+-----+-----+-----+
|val1|val_2|val_3|val_4|val_5|
+----+-----+-----+-----+-----+
| 1| 5| 10| 17| 26|
| 2| 6| 11| 18| 27|
| 3| 7| 12| 19| 28|
| 4| 8| 13| 20| 29|
| 5| 9| 14| 21| 30|
| 6| 10| 15| 22| 31|
| 7| 11| 16| 23| 32|
| 8| 12| 17| 24| 33|
| 9| 13| 18| 25| 34|
| 10| 14| 19| 26| 35|
+----+-----+-----+-----+-----+
**Question: ** How can I rewrite the above loop to be more efficient?
I've noticed that my code runs slower as Spark spends a lot of time on each group of loops (even on small datasets like 2GB of text input).
Thanks
You can add multiple columns to PySpark DataFrame in several ways if you wanted to add a known set of columns you can easily do it by chaining withColumn() or using select() .
apply is not faster in itself but it has advantages when used in combination with DataFrames. This depends on the content of the apply expression. If it can be executed in Cython space, apply is much faster (which is the case here). We can use apply with a Lambda function.
You can use reduce , for loops, or list comprehensions to apply PySpark functions to multiple columns in a DataFrame. Using iterators to apply the same operation on multiple columns is vital for maintaining a DRY codebase.
There is a small overhead of repeatedly calling JVM method but otherwise for loop alone shouldn't be a problem. You can improve it slightly by using a single select:
df = spark.range(1, 11).toDF("val1")
def make_col(i):
return (F.pow(F.lit(i), 2) + F.col("val1")).alias("val_{0}".format(i))
spark.range(1, 11).toDF("val1").select("*", *(make_col(i) for i in range(2, 6)))
I would also avoid using NumPy types. Initializing NumPy objects is typically more expensive compared to plain Python objects and Spark SQL doesn't support NumPy types so there some additional conversions required.
One withColumn will work on entire rdd. So generally its not a good practise to use the method for every column you want to add. There is a way where you work with columns and their data inside a map function. Since one map function is doing the job here, the code to add new column and its data will be done in parallel.
a. you can gather new values based on the calculations
b. Add these new column values to main rdd as below
val newColumns: Seq[Any] = Seq(newcol1,newcol2)
Row.fromSeq(row.toSeq.init ++ newColumns)
Here row, is the reference of row in map method
c. Create new schema as below
val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType))
d. Add to the old schema
val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType)
e. Create new dataframe with new columns
val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With