Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is Pandas UDF not being parallelized?

I have data from many IoT sensors. For each particular sensor, there's only about 100 rows in the dataframe: the data is not skewed. I'm training an individual machine learning model for each sensor.

I'm using pandas udf successfully to train and log mlflow metrics of different models in parallel (supposedly), as taught here.

Using Databricks on Azure with a single node cluster (Standard_DS3_v2 - 14GB of memory - 4 cores) I was able to finish all the training in about 23min.

Since pandas udf, supposedly, does the computing in parallel for each group, I thought that I could finish the training faster by using a single node cluster with more cores, or by using a cluster with more workers. So I tried to run the same notebook with:

  1. A cluster of computers: 1 master + 3 workers, all (Standard_DS3_v2 - 14GB of memory - 4 cores)
  2. A single node cluster with (Standard_DS5_v2 - 56GB of memory - 16 cores)

For my surprise, the training time did not decreased: 23min for option 1, and 26.5min for option 2

I tried to use the newer applyInPandas, but the result was roughly the same.

NB: After @Chris answer, checking the Stage Detail page on the Web UI (for the cluster with 1 master + 3 workers), I see that I have only one stage responsible for the udf pandas training. It took 20 min. Acessing the details of this stage, I see it had only one task, with {'Locality Level': 'PROCESS_LOCAL'}, that took the whole 20min. Screenshots below.

So @Chris has identified the problem: the training is not being parallelized.

In order to understand why the applyInPandas (or udf pandas) is not being parallelized, I put my code below (with the applyInPandas version). Note that my goal is only to log the trained models metrics with mlflow, so what the function returns is just the original df it receives.

Note also that the code is working as expected. mlflow is logging the training successfully. My only issue is why it's not being parallelized.

I have a feeling that the issue is with the for loop, since it differs from the tutorial.

import pyspark.sql.functions as f
import mlflow
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
import pmdarima as pm
from statsmodels.tsa.statespace.sarimax import SARIMAX

def train_model(df_pandas):
  '''
  Trains a model on grouped instances
  '''
  original_df = df_pandas.copy() #the original df will be returned in the end
  PA = df_pandas['Princípio_Ativo'].iloc[0]
  run_id = df_pandas['run_id'].iloc[0] # Pulls run ID to do a nested run
  
  
  observacoes_no_teste = 12
  horizonte = 1
  observacoes_total = len(df_pandas.index)
  observacoes_no_train = len(df_pandas.index) - observacoes_no_teste
  
  try:
    #train test split
    X = df_pandas[:observacoes_no_train]['Demanda']
    y = df_pandas[observacoes_no_train:]['Demanda']

    # Train the model
    model = pm.auto_arima(X, seasonal=True, m=12)

    order = model.get_params()['order']
    seasonal_order = model.get_params()['seasonal_order']


  except:
    pass
 
  # Resume the top-level training
  with mlflow.start_run(run_id=run_id, experiment_id=1333367041812290):
    # Create a nested run for the specific device
    with mlflow.start_run(run_name=str(PA), nested=True, experiment_id=1333367041812290) as run:
      
      mae_list = []
      mse_list = []
      previsoes_list = []
      teste_list = []
      predictions_list = []

    
      try:
        #the purpose of the following loop is to do backtesting: the model is trained with n observations, and the (n+1)th is predicted. n is increased by 1 on each iteration.
        for i in range(observacoes_total-observacoes_no_train-horizonte+1):
          #train test split
          X = df_pandas[:observacoes_no_train+i]['Demanda']
          y = df_pandas[observacoes_no_train+i:observacoes_no_train+i+horizonte]['Demanda']
          #train model
          model = SARIMAX(X, order=order, seasonal_order=seasonal_order)
          model = model.fit()
          #make predictions
          predictions = model.predict(start=observacoes_no_train + i, end=(observacoes_no_train + i + horizonte-1))

          predictions_list.append(predictions)

          mse = round(mean_squared_error(y, predictions),2)
          mae = round(mean_absolute_error(y, predictions),2)

          mse_list.append(mse)
          mae_list.append(mae)

        #series with predictions
        in_sample_predictions = pd.concat(predictions_list)
        in_sample_predictions.name = 'in_sample'
        #out of sample predictions
        hp = 3
        out_of_sample_predictions = model.predict(start=observacoes_total, end=(observacoes_total + hp - 1))
        out_of_sample_predictions.name = 'out_sample'
        #in sample + out of sample predictions
        df_predictions = pd.concat([df_pandas.drop('run_id',axis=1), in_sample_predictions,out_of_sample_predictions], axis=1)
        #save df with predictions to be logged as an artifact my mlflow.
        df_predictions.to_csv('df_predictions.csv')

        #mlflow logging
        mlflow.log_param("Princípio_Ativo", PA)
        mlflow.log_param("mae_list", str(mae_list))
        mlflow.log_param("mse_list", str(mse_list))
        mlflow.log_param("status_sucesso", 'sim')
        mlflow.log_artifact('df_predictions.csv')
      except:
        mlflow.log_param("status_falha", 'sim')

  return original_df.drop('run_id', axis=1) 

with mlflow.start_run(run_name="SARIMA", experiment_id=1333367041812290) as run:
  run_id = run.info.run_uuid

  modelDirectoriesDF = (df
    .withColumn("run_id", f.lit(run_id)) # Add run_id
    .groupby("Princípio_Ativo")
    .applyInPandas(train_model, schema=df.schema)
  )
  
combinedDF = (df
  .join(modelDirectoriesDF, on="Princípio_Ativo", how="left")
)

display(combinedDF)

Screenshots from the Spark UI: Picture1 picture2 picture3

like image 549
marcus Avatar asked Nov 06 '22 01:11

marcus


1 Answers

Correct. A stage with only one task is not parallelised. This would explain why the runtime is not decreased by adding more cores or nodes in the cluster.

Your input dataset is small (69KB), so unless you explicitly repartition, Spark will write it to a single partition if the dataframe partition size is left as the default 128MB (dictated by spark.sql.files.maxPartitionBytes parameter). As such, it will be assigned to a single task.

Repartitioning the input by the device column should provide the parallel training you are looking for.

like image 67
Chris Avatar answered Nov 14 '22 22:11

Chris