I have data from many IoT sensors. For each particular sensor, there's only about 100 rows in the dataframe: the data is not skewed. I'm training an individual machine learning model for each sensor.
I'm using pandas udf
successfully to train and log mlflow metrics of different models in parallel (supposedly), as taught here.
Using Databricks on Azure with a single node cluster (Standard_DS3_v2 - 14GB of memory - 4 cores) I was able to finish all the training in about 23min.
Since pandas udf
, supposedly, does the computing in parallel for each group, I thought that I could finish the training faster by using a single node cluster with more cores, or by using a cluster with more workers. So I tried to run the same notebook with:
For my surprise, the training time did not decreased: 23min for option 1, and 26.5min for option 2
I tried to use the newer applyInPandas
, but the result was roughly the same.
NB: After @Chris answer, checking the Stage Detail page on the Web UI (for the cluster with 1 master + 3 workers), I see that I have only one stage responsible for the udf pandas training. It took 20 min. Acessing the details of this stage, I see it had only one task, with {'Locality Level': 'PROCESS_LOCAL'}
, that took the whole 20min. Screenshots below.
So @Chris has identified the problem: the training is not being parallelized.
In order to understand why the applyInPandas
(or udf pandas
) is not being parallelized, I put my code below (with the applyInPandas
version). Note that my goal is only to log the trained models metrics with mlflow
, so what the function returns is just the original df it receives.
Note also that the code is working as expected. mlflow is logging the training successfully. My only issue is why it's not being parallelized.
I have a feeling that the issue is with the for loop
, since it differs from the tutorial.
import pyspark.sql.functions as f
import mlflow
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
import pmdarima as pm
from statsmodels.tsa.statespace.sarimax import SARIMAX
def train_model(df_pandas):
'''
Trains a model on grouped instances
'''
original_df = df_pandas.copy() #the original df will be returned in the end
PA = df_pandas['Princípio_Ativo'].iloc[0]
run_id = df_pandas['run_id'].iloc[0] # Pulls run ID to do a nested run
observacoes_no_teste = 12
horizonte = 1
observacoes_total = len(df_pandas.index)
observacoes_no_train = len(df_pandas.index) - observacoes_no_teste
try:
#train test split
X = df_pandas[:observacoes_no_train]['Demanda']
y = df_pandas[observacoes_no_train:]['Demanda']
# Train the model
model = pm.auto_arima(X, seasonal=True, m=12)
order = model.get_params()['order']
seasonal_order = model.get_params()['seasonal_order']
except:
pass
# Resume the top-level training
with mlflow.start_run(run_id=run_id, experiment_id=1333367041812290):
# Create a nested run for the specific device
with mlflow.start_run(run_name=str(PA), nested=True, experiment_id=1333367041812290) as run:
mae_list = []
mse_list = []
previsoes_list = []
teste_list = []
predictions_list = []
try:
#the purpose of the following loop is to do backtesting: the model is trained with n observations, and the (n+1)th is predicted. n is increased by 1 on each iteration.
for i in range(observacoes_total-observacoes_no_train-horizonte+1):
#train test split
X = df_pandas[:observacoes_no_train+i]['Demanda']
y = df_pandas[observacoes_no_train+i:observacoes_no_train+i+horizonte]['Demanda']
#train model
model = SARIMAX(X, order=order, seasonal_order=seasonal_order)
model = model.fit()
#make predictions
predictions = model.predict(start=observacoes_no_train + i, end=(observacoes_no_train + i + horizonte-1))
predictions_list.append(predictions)
mse = round(mean_squared_error(y, predictions),2)
mae = round(mean_absolute_error(y, predictions),2)
mse_list.append(mse)
mae_list.append(mae)
#series with predictions
in_sample_predictions = pd.concat(predictions_list)
in_sample_predictions.name = 'in_sample'
#out of sample predictions
hp = 3
out_of_sample_predictions = model.predict(start=observacoes_total, end=(observacoes_total + hp - 1))
out_of_sample_predictions.name = 'out_sample'
#in sample + out of sample predictions
df_predictions = pd.concat([df_pandas.drop('run_id',axis=1), in_sample_predictions,out_of_sample_predictions], axis=1)
#save df with predictions to be logged as an artifact my mlflow.
df_predictions.to_csv('df_predictions.csv')
#mlflow logging
mlflow.log_param("Princípio_Ativo", PA)
mlflow.log_param("mae_list", str(mae_list))
mlflow.log_param("mse_list", str(mse_list))
mlflow.log_param("status_sucesso", 'sim')
mlflow.log_artifact('df_predictions.csv')
except:
mlflow.log_param("status_falha", 'sim')
return original_df.drop('run_id', axis=1)
with mlflow.start_run(run_name="SARIMA", experiment_id=1333367041812290) as run:
run_id = run.info.run_uuid
modelDirectoriesDF = (df
.withColumn("run_id", f.lit(run_id)) # Add run_id
.groupby("Princípio_Ativo")
.applyInPandas(train_model, schema=df.schema)
)
combinedDF = (df
.join(modelDirectoriesDF, on="Princípio_Ativo", how="left")
)
display(combinedDF)
Screenshots from the Spark UI:
Correct. A stage with only one task is not parallelised. This would explain why the runtime is not decreased by adding more cores or nodes in the cluster.
Your input dataset is small (69KB), so unless you explicitly repartition
, Spark will write it to a single partition if the dataframe partition size is left as the default 128MB (dictated by spark.sql.files.maxPartitionBytes
parameter). As such, it will be assigned to a single task.
Repartitioning the input by the device column should provide the parallel training you are looking for.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With