Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark copying dataframe columns best practice in Python/PySpark?

Tags:

This is for Python/PySpark using Spark 2.3.2. I am looking for best practice approach for copying columns of one data frame to another data frame using Python/PySpark for a very large data set of 10+ billion rows (partitioned by year/month/day, evenly). Each row has 120 columns to transform/copy. The output data frame will be written, date partitioned, into another parquet set of files.

Example schema is: input DFinput (colA, colB, colC) and output DFoutput (X, Y, Z)

I want to copy DFInput to DFOutput as follows (colA => Z, colB => X, colC => Y).

What is the best practice to do this in Python Spark 2.3+ ? Should I use DF.withColumn() method for each column to copy source into destination columns? Will this perform well given billions of rows each with 110+ columns to copy?

Thank you

like image 828
Acid Rider Avatar asked Dec 19 '18 01:12

Acid Rider


2 Answers

Another way for handling column mapping in PySpark is via dictionary. Dictionaries help you to map the columns of the initial dataframe into the columns of the final dataframe using the the key/value structure as shown below:

from pyspark.sql.functions import col

df = spark.createDataFrame([
  [1, "John", "2019-12-01 10:00:00"],
  [2, "Michael", "2019-12-01 11:00:00"],
  [2, "Michael", "2019-12-01 11:01:00"],
  [3, "Tom", "2019-11-13 20:00:00"],
  [3, "Tom", "2019-11-14 00:00:00"],
  [4, "Sofy", "2019-10-01 01:00:00"]
], ["A", "B", "C"])


col_map = {"A":"Z", "B":"X", "C":"Y"}

df.select(*[col(k).alias(col_map[k]) for k in col_map]).show()

# +---+-------+-------------------+
# |  Z|      X|                  Y|
# +---+-------+-------------------+
# |  1|   John|2019-12-01 10:00:00|
# |  2|Michael|2019-12-01 11:00:00|
# |  2|Michael|2019-12-01 11:01:00|
# |  3|    Tom|2019-11-13 20:00:00|
# |  3|    Tom|2019-11-14 00:00:00|
# |  4|   Sofy|2019-10-01 01:00:00|
# +---+-------+-------------------+

Here we map A, B, C into Z, X, Y respectively.

And if you want a modular solution you also put everything inside a function:

def transform_cols(mappings, df):
  return df.select(*[col(k).alias(mappings[k]) for k in mappings])

Or even more modular by using monkey patching to extend the existing functionality of the DataFrame class. Place the next code on top of your PySpark code (you can also create a mini library and include it on your code when needed):

from pyspark.sql import DataFrame

def transform_cols(self, mappings):
  return self.select(*[col(k).alias(mappings[k]) for k in mappings])

DataFrame.transform = transform_cols

Then call it with:

df.transform(col_map).show()

PS: This could be a convenient way to extend the DataFrame functionality by creating your own libraries and expose them via the DataFrame and monkey patching (extension method for those familiar with C#).

like image 150
abiratsis Avatar answered Dec 17 '22 05:12

abiratsis


The approach using Apache Spark - as far as I understand your problem - is to transform your input DataFrame into the desired output DataFrame. You can simply use selectExpr on the input DataFrame for that task:

outputDF = inputDF.selectExpr("colB as X", "colC as Y", "colA as Z")

This transformation will not "copy" data from the input DataFrame to the output DataFrame.

like image 45
effemm Avatar answered Dec 17 '22 05:12

effemm