Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can you efficiently build one ML model per partition in Spark with foreachPartition?

I am trying to fit one ML model for each partition of my dataset, and I do not know how to do it in Spark.

My dataset is basically looking like this and is partitioned by Company:

Company | Features | Target

A         xxx        0.9
A         xxx        0.8
A         xxx        1.0
B         xxx        1.2
B         xxx        1.0
B         xxx        0.9
C         xxx        0.7
C         xxx        0.9
C         xxx        0.9

My goal is to train one regressor for each company, in a parallelised way (I have a few hundred millions of records, with 100k companies). My intuition is that I need to use foreachPartition to have the partitions (i.e. my companies) processed in parallel and each company model trained and saved. My main problem is about how to deal with iterator type that is to be used in the function called by foreachPartition.

Here is what it would look like:

dd.foreachPartition(

    iterator => {var company_df = operator.toDF()
                 var rg = RandomForestRegressor()
                                 .setLabelCol("target")
                                 .setFeaturesCol("features")
                                 .setNumTrees(10)
                 var model = rg.fit(company_df)
                 model.write.save(company_path)
                 }
)

As I understand it, trying to convert the iterator into a dataframe is not a possibility since the concept of RDD is cannot exist by itself in a foreachPartition statement.

I know the question is quite open but I am really stuck.

like image 604
aprevel Avatar asked Nov 18 '25 02:11

aprevel


1 Answers

In pyspark you can do something like below

import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)

beta = df.groupby(group_column).apply(ols)
like image 129
Jayadeep Jayaraman Avatar answered Nov 19 '25 20:11

Jayadeep Jayaraman