This is for Python/PySpark using Spark 2.3.2. I am looking for best practice approach for copying columns of one data frame to another data frame using Python/PySpark for a very large data set of 10+ billion rows (partitioned by year/month/day, evenly). Each row has 120 columns to transform/copy. The output data frame will be written, date partitioned, into another parquet set of files.
Example schema is: input DFinput (colA, colB, colC) and output DFoutput (X, Y, Z)
I want to copy DFInput to DFOutput as follows (colA => Z, colB => X, colC => Y).
What is the best practice to do this in Python Spark 2.3+ ? Should I use DF.withColumn() method for each column to copy source into destination columns? Will this perform well given billions of rows each with 110+ columns to copy?
Thank you
Another way for handling column mapping in PySpark is via dictionary
. Dictionaries help you to map the columns of the initial dataframe into the columns of the final dataframe using the the key/value
structure as shown below:
from pyspark.sql.functions import col
df = spark.createDataFrame([
[1, "John", "2019-12-01 10:00:00"],
[2, "Michael", "2019-12-01 11:00:00"],
[2, "Michael", "2019-12-01 11:01:00"],
[3, "Tom", "2019-11-13 20:00:00"],
[3, "Tom", "2019-11-14 00:00:00"],
[4, "Sofy", "2019-10-01 01:00:00"]
], ["A", "B", "C"])
col_map = {"A":"Z", "B":"X", "C":"Y"}
df.select(*[col(k).alias(col_map[k]) for k in col_map]).show()
# +---+-------+-------------------+
# | Z| X| Y|
# +---+-------+-------------------+
# | 1| John|2019-12-01 10:00:00|
# | 2|Michael|2019-12-01 11:00:00|
# | 2|Michael|2019-12-01 11:01:00|
# | 3| Tom|2019-11-13 20:00:00|
# | 3| Tom|2019-11-14 00:00:00|
# | 4| Sofy|2019-10-01 01:00:00|
# +---+-------+-------------------+
Here we map A, B, C into Z, X, Y respectively.
And if you want a modular solution you also put everything inside a function:
def transform_cols(mappings, df):
return df.select(*[col(k).alias(mappings[k]) for k in mappings])
Or even more modular by using monkey patching to extend the existing functionality of the DataFrame
class. Place the next code on top of your PySpark code (you can also create a mini library and include it on your code when needed):
from pyspark.sql import DataFrame
def transform_cols(self, mappings):
return self.select(*[col(k).alias(mappings[k]) for k in mappings])
DataFrame.transform = transform_cols
Then call it with:
df.transform(col_map).show()
PS: This could be a convenient way to extend the DataFrame functionality by creating your own libraries and expose them via the DataFrame and monkey patching (extension method for those familiar with C#).
The approach using Apache Spark - as far as I understand your problem - is to transform your input DataFrame into the desired output DataFrame. You can simply use selectExpr
on the input DataFrame for that task:
outputDF = inputDF.selectExpr("colB as X", "colC as Y", "colA as Z")
This transformation will not "copy" data from the input DataFrame to the output DataFrame.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With