I am looking for a way to run the spark.ml.feature.PCA
function over grouped data returned from a groupBy()
call on a dataframe. But I'm not sure if this is possible, or how to achieve it. This is a basic example that hopefully illustrates what I want to do:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
df = spark.createDataFrame([[3, 1, 1], [4, 2, 1], [5, 2, 1], [3, 3, 2], [6, 2, 2], [4, 4, 2]], ["Value1", "Value2", "ID"])
df.show()
+------+------+---+
|Value1|Value2| ID|
+------+------+---+
| 3| 1| 1|
| 4| 2| 1|
| 5| 2| 1|
| 3| 3| 2|
| 6| 2| 2|
| 4| 4| 2|
+------+------+---+
assembler = VectorAssembler(inputCols=["Value1", "Value2"], outputCol="features")
df2 = assembler.transform(df)
df2.show()
+------+------+---+---------+
|Value1|Value2| ID| features|
+------+------+---+---------+
| 3| 1| 1|[3.0,1.0]|
| 4| 2| 1|[4.0,2.0]|
| 5| 2| 1|[5.0,2.0]|
| 3| 3| 2|[3.0,3.0]|
| 6| 2| 2|[6.0,2.0]|
| 4| 4| 2|[4.0,4.0]|
+------+------+---+---------+
pca = PCA(k=1, inputCol="features", outputCol="component")
At this point I have the dataframe and the pca object that I want to use. I would like to now perform PCA on the dataframe but grouped by "ID", so I would get the PCA for all of the features with ID 1, and the PCA for all of the features where ID is 2, just returning the components. I can get these manually by:
>>>> pca.fit(df2.where("ID==1")).pc
DenseMatrix(2, 1, [-0.8817, -0.4719], 0)
>>>> pca.fit(dff.where("ID==2")).pc
DenseMatrix(2, 1, [-0.8817, 0.4719], 0)
But I would like to run this over all of the different IDs in the dataframe in parallel, something like:
df2.groupBy("ID").map(lambda group: pca.fit(group).pc)
But you can't use map()
on grouped data like this. Is there a way to achieve this?
As of Spark 3.0.0
, you can use applyInPandas
to apply a simple Python function to each group of the current DataFrame and return the result as another DataFrame. You basically need to define the output schema of the returned DataFrame.
Here I will use scikit-learn's PCA
function instead of the Spark implementation as it has to be applied to single pandas DataFrames, not Spark ones. The principal components to be found should be the same anyway.
import pandas as pd
from sklearn.decomposition import PCA
from pyspark.sql.types import StructField, StructType, DoubleType
# define PCA parameters
cols = ['Value1', 'Value2']
pca_components = 1
# define Python function
def pca_udf(pdf):
X = pdf[cols]
pca = PCA(n_components=pca_components)
PC = pca.fit_transform(X)
PC_df = pd.DataFrame(PC, columns=['PC_' + str(i+1) for i in range(pca_components)])
result = pd.concat([pdf, PC_df], axis=1, ignore_index=True)
return result
# define output schema; principal components are generated dynamically based on `pca_components`
to_append = [StructField('PC_' + str(i+1), DoubleType(), True) for i in range(pca_components)]
output_schema = StructType(df.schema.fields + to_append)
df\
.groupby('ID')\
.applyInPandas(pca_udf, output_schema)\
.show()
+------+------+---+-------------------+
|Value1|Value2| ID| PC_1|
+------+------+---+-------------------+
| 3| 1| 1| 1.1962465491226262|
| 4| 2| 1|-0.1572859751773413|
| 5| 2| 1|-1.0389605739452852|
| 3| 3| 2|-1.1755661316905914|
| 6| 2| 2| 1.941315590145264|
| 4| 4| 2|-0.7657494584546719|
+------+------+---+-------------------+
Before Spark 3.0.0
- but still with Spark>=2.3.0
- the solution is similar but we need to actually define a pandas_udf
, a vectorized user-defined function executed by Spark using Arrow to transfer data and Pandas to work with the data. The concepts to define it are similar to the previous ones anyway.
import pandas as pd
from sklearn.decomposition import PCA
from pyspark.sql.types import StructField, StructType, DoubleType
from pyspark.sql.functions import pandas_udf, PandasUDFType
# macro-function that includes the pandas_udf and allows to pass it some parameters
def pca_by_group(df, cols, pca_components=1):
# build output schema for the Pandas UDF
# principal components are generated dynamically based on `pca_components`
to_append = [StructField('PC_' + str(i+1), DoubleType(), True) for i in range(pca_components)]
output_schema = StructType(df.schema.fields + to_append)
# Pandas UDF for applying PCA within each group
@pandas_udf(output_schema, functionType=PandasUDFType.GROUPED_MAP)
def pca_udf(pdf):
X = pdf[cols]
pca = PCA(n_components=pca_components)
PC = pca.fit_transform(X)
PC_df = pd.DataFrame(PC, columns=['PC_' + str(i+1) for i in range(pca_components)])
result = pd.concat([pdf, PC_df], axis=1, ignore_index=True)
return result
# apply the Pandas UDF
df = df\
.groupby('ID')\
.apply(pca_udf)
return df
new_df = pca_by_group(df, cols=['Value1', 'Value2'], pca_components=1)
new_df.show()
+------+------+---+-------------------+
|Value1|Value2| ID| PC_1|
+------+------+---+-------------------+
| 3| 1| 1| 1.1962465491226262|
| 4| 2| 1|-0.1572859751773413|
| 5| 2| 1|-1.0389605739452852|
| 3| 3| 2|-1.1755661316905914|
| 6| 2| 2| 1.941315590145264|
| 4| 4| 2|-0.7657494584546719|
+------+------+---+-------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With