Using Pyspark I would like to apply kmeans separately on groups of a dataframe and not to the whole dataframe at once. For the moment I use a for loop which iterates on each group, applies kmeans and appends the result to another table. But having a lot of groups makes it time consuming. Anyone could help me please?? Thanks a lot!
for customer in customer_list:
temp_df = togroup.filter(col("customer_id")==customer)
df = assembler.transform(temp_df)
k = 1
while (k < 5 & mtrc < width):
k += 1
kmeans = KMeans(k=k,seed=5,maxIter=20,initSteps=5)
model = kmeans.fit(df)
mtric = 1 - model.computeCost(df)/ttvar
a = model.transform(df)select(cols)
allcustomers = allcustomers .union(a)
I came up with a solution using pandas_udf. A pure spark or scala solution is preferred and yet to be offered. Assume my data is
import pandas as pd
df_pd = pd.DataFrame([['cat1',10.],['cat1',20.],['cat1',11.],['cat1',21.],['cat1',22.],['cat1',9.],['cat2',101.],['cat2',201.],['cat2',111.],['cat2',214.],['cat2',224.],['cat2',99.]],columns=['cat','val'])
df_sprk = spark.createDataFrame(df_pd)
First solve the problem in pandas:
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=2,random_state=0)
def skmean(kmeans,x):
X = np.array(x)
kmeans.fit(X)
return(kmeans.predict(X))
You can apply skmean() to a panda data frame (to make sure it works properly):
df_pd.groupby('cat').apply(lambda x:skmean(kmeans,x)).reset_index()
To apply the function to pyspark data frame, we use pandas_udf. But first define a schema for the output data frame:
from pyspark.sql.types import *
schema = StructType(
[StructField('cat',StringType(),True),
StructField('clusters',ArrayType(IntegerType()))])
Convert the function above to a pandas_udf:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def skmean_udf(df):
result = pd.DataFrame(
df.groupby('cat').apply(lambda x: skmean(kmeans,x))
result.reset_index(inplace=True, drop=False)
return(result)
You can use the function as follows:
df_spark.groupby('cat').apply(skmean_udf).show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With