Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Perform PCA on each group of a groupBy in PySpark

I am looking for a way to run the spark.ml.feature.PCA function over grouped data returned from a groupBy() call on a dataframe. But I'm not sure if this is possible, or how to achieve it. This is a basic example that hopefully illustrates what I want to do:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA   

df = spark.createDataFrame([[3, 1, 1], [4, 2, 1], [5, 2, 1], [3, 3, 2], [6, 2, 2], [4, 4, 2]], ["Value1", "Value2",  "ID"])

df.show()
+------+------+---+
|Value1|Value2| ID|
+------+------+---+
|     3|     1|  1|
|     4|     2|  1|
|     5|     2|  1|
|     3|     3|  2|
|     6|     2|  2|
|     4|     4|  2|
+------+------+---+

assembler = VectorAssembler(inputCols=["Value1", "Value2"], outputCol="features")

df2 = assembler.transform(df)

df2.show()
+------+------+---+---------+
|Value1|Value2| ID| features|
+------+------+---+---------+
|     3|     1|  1|[3.0,1.0]|
|     4|     2|  1|[4.0,2.0]|
|     5|     2|  1|[5.0,2.0]|
|     3|     3|  2|[3.0,3.0]|
|     6|     2|  2|[6.0,2.0]|
|     4|     4|  2|[4.0,4.0]|
+------+------+---+---------+

pca = PCA(k=1, inputCol="features", outputCol="component")

At this point I have the dataframe and the pca object that I want to use. I would like to now perform PCA on the dataframe but grouped by "ID", so I would get the PCA for all of the features with ID 1, and the PCA for all of the features where ID is 2, just returning the components. I can get these manually by:

>>>> pca.fit(df2.where("ID==1")).pc
DenseMatrix(2, 1, [-0.8817, -0.4719], 0)
>>>> pca.fit(dff.where("ID==2")).pc
DenseMatrix(2, 1, [-0.8817, 0.4719], 0)

But I would like to run this over all of the different IDs in the dataframe in parallel, something like:

df2.groupBy("ID").map(lambda group: pca.fit(group).pc)

But you can't use map() on grouped data like this. Is there a way to achieve this?

like image 271
Tim B Avatar asked Jul 21 '17 14:07

Tim B


1 Answers

Spark>=3.0.0

As of Spark 3.0.0, you can use applyInPandas to apply a simple Python function to each group of the current DataFrame and return the result as another DataFrame. You basically need to define the output schema of the returned DataFrame.

Here I will use scikit-learn's PCA function instead of the Spark implementation as it has to be applied to single pandas DataFrames, not Spark ones. The principal components to be found should be the same anyway.

import pandas as pd
from sklearn.decomposition import PCA
from pyspark.sql.types import StructField, StructType, DoubleType


# define PCA parameters
cols = ['Value1', 'Value2']
pca_components = 1


# define Python function
def pca_udf(pdf):
    X = pdf[cols]
    pca = PCA(n_components=pca_components)
    PC = pca.fit_transform(X)
    PC_df = pd.DataFrame(PC, columns=['PC_' + str(i+1) for i in range(pca_components)])
    result = pd.concat([pdf, PC_df], axis=1, ignore_index=True)
    return result


# define output schema; principal components are generated dynamically based on `pca_components`
to_append = [StructField('PC_' + str(i+1), DoubleType(), True) for i in range(pca_components)]
output_schema = StructType(df.schema.fields + to_append)


df\
  .groupby('ID')\
  .applyInPandas(pca_udf, output_schema)\
  .show()

+------+------+---+-------------------+
|Value1|Value2| ID|               PC_1|
+------+------+---+-------------------+
|     3|     1|  1| 1.1962465491226262|
|     4|     2|  1|-0.1572859751773413|
|     5|     2|  1|-1.0389605739452852|
|     3|     3|  2|-1.1755661316905914|
|     6|     2|  2|  1.941315590145264|
|     4|     4|  2|-0.7657494584546719|
+------+------+---+-------------------+

Spark<3.0.0

Before Spark 3.0.0 - but still with Spark>=2.3.0 - the solution is similar but we need to actually define a pandas_udf, a vectorized user-defined function executed by Spark using Arrow to transfer data and Pandas to work with the data. The concepts to define it are similar to the previous ones anyway.

import pandas as pd
from sklearn.decomposition import PCA
from pyspark.sql.types import StructField, StructType, DoubleType
from pyspark.sql.functions import pandas_udf, PandasUDFType


# macro-function that includes the pandas_udf and allows to pass it some parameters
def pca_by_group(df, cols, pca_components=1):
    # build output schema for the Pandas UDF
    # principal components are generated dynamically based on `pca_components`
    to_append = [StructField('PC_' + str(i+1), DoubleType(), True) for i in range(pca_components)]
    output_schema = StructType(df.schema.fields + to_append)

    # Pandas UDF for applying PCA within each group
    @pandas_udf(output_schema, functionType=PandasUDFType.GROUPED_MAP)
    def pca_udf(pdf):
        X = pdf[cols]
        pca = PCA(n_components=pca_components)
        PC = pca.fit_transform(X)
        PC_df = pd.DataFrame(PC, columns=['PC_' + str(i+1) for i in range(pca_components)])
        result = pd.concat([pdf, PC_df], axis=1, ignore_index=True)
        return result
    
    # apply the Pandas UDF
    df = df\
        .groupby('ID')\
        .apply(pca_udf)
    
    return df


new_df = pca_by_group(df, cols=['Value1', 'Value2'], pca_components=1)
new_df.show()

+------+------+---+-------------------+
|Value1|Value2| ID|               PC_1|
+------+------+---+-------------------+
|     3|     1|  1| 1.1962465491226262|
|     4|     2|  1|-0.1572859751773413|
|     5|     2|  1|-1.0389605739452852|
|     3|     3|  2|-1.1755661316905914|
|     6|     2|  2|  1.941315590145264|
|     4|     4|  2|-0.7657494584546719|
+------+------+---+-------------------+
like image 164
Ric S Avatar answered Oct 14 '22 05:10

Ric S