I am trying to fit one ML model for each partition of my dataset, and I do not know how to do it in Spark.
My dataset is basically looking like this and is partitioned by Company:
Company | Features | Target
A xxx 0.9
A xxx 0.8
A xxx 1.0
B xxx 1.2
B xxx 1.0
B xxx 0.9
C xxx 0.7
C xxx 0.9
C xxx 0.9
My goal is to train one regressor for each company, in a parallelised way (I have a few hundred millions of records, with 100k companies).
My intuition is that I need to use foreachPartition to have the partitions (i.e. my companies) processed in parallel and each company model trained and saved. My main problem is about how to deal with iterator type that is to be used in the function called by foreachPartition.
Here is what it would look like:
dd.foreachPartition(
iterator => {var company_df = operator.toDF()
var rg = RandomForestRegressor()
.setLabelCol("target")
.setFeaturesCol("features")
.setNumTrees(10)
var model = rg.fit(company_df)
model.write.save(company_path)
}
)
As I understand it, trying to convert the iterator into a dataframe is not a possibility since the concept of RDD is cannot exist by itself in a foreachPartition statement.
I know the question is quite open but I am really stuck.
In pyspark you can do something like below
import statsmodels.api as sm
# df has four columns: id, y, x1, x2
group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
group_key = pdf[group_column].iloc[0]
y = pdf[y_column]
X = pdf[x_columns]
X = sm.add_constant(X)
model = sm.OLS(y, X).fit()
return pd.DataFrame([[group_key] + [model.params[i] for i in x_columns]], columns=[group_column] + x_columns)
beta = df.groupby(group_column).apply(ols)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With