I was wondering what is the proper way of logging metrics when using DDP. I noticed that if I want to print something inside validation_epoch_end
it will be printed twice when using 2 GPUs. I was expecting validation_epoch_end
to be called only on rank 0 and to receive the outputs from all GPUs, but I am not sure this is correct anymore. Therefore I have several questions:
validation_epoch_end(self, outputs)
- When using DDP does every subprocess receive the data processed from the current GPU or data processed from all GPUs, i.e. does the input parameter outputs
contains the outputs of the entire validation set, from all GPUs?outputs
is GPU/process specific what is the proper way to calculate any metric on the entire validation set in validation_epoch_end
when using DDP?I understand that I can solve the printing by checking self.global_rank == 0
and printing/logging only in that case, however I am trying to get a deeper understanding of what I am printing/logging in this case.
Here is a code snippet from my use case. I would like to be able to report f1, precision and recall on the entire validation dataset and I am wondering what is the correct way of doing it when using DDP.
def _process_epoch_outputs(self,
outputs: List[Dict[str, Any]]
) -> Tuple[torch.Tensor, torch.Tensor]:
"""Creates and returns tensors containing all labels and predictions
Goes over the outputs accumulated from every batch, detaches the
necessary tensors and stacks them together.
Args:
outputs (List[Dict])
"""
all_labels = []
all_predictions = []
for output in outputs:
for labels in output['labels'].detach():
all_labels.append(labels)
for predictions in output['predictions'].detach():
all_predictions.append(predictions)
all_labels = torch.stack(all_labels).long().cpu()
all_predictions = torch.stack(all_predictions).cpu()
return all_predictions, all_labels
def validation_epoch_end(self, outputs: List[Dict[str, Any]]) -> None:
"""Logs f1, precision and recall on the validation set."""
if self.global_rank == 0:
print(f'Validation Epoch: {self.current_epoch}')
predictions, labels = self._process_epoch_outputs(outputs)
for i, name in enumerate(self.label_columns):
f1, prec, recall, t = metrics.get_f1_prec_recall(predictions[:, i],
labels[:, i],
threshold=None)
self.logger.experiment.add_scalar(f'{name}_f1/Val',
f1,
self.current_epoch)
self.logger.experiment.add_scalar(f'{name}_Precision/Val',
prec,
self.current_epoch)
self.logger.experiment.add_scalar(f'{name}_Recall/Val',
recall,
self.current_epoch)
if self.global_rank == 0:
print((f'F1: {f1}, Precision: {prec}, '
f'Recall: {recall}, Threshold {t}'))
validation_epoch_end(self, outputs) - When using DDP does every subprocess receive the data processed from the current GPU or data processed from all GPUs, i.e. does the input parameter outputs contains the outputs of the entire validation set, from all GPUs?
Data processed from the current GPU only, outputs are not synchronized, there is only backward
synchronization (gradients are synchronized during training and distributed to replicas of models residing on each GPU).
Imagine that all of the outputs were passed from 1000
GPUs to this poor master, it could give it an OOM very easily
If outputs is GPU/process specific what is the proper way to calculate any metric on the entire validation set in validation_epoch_end when using DDP?
According to documentation (emphasis mine):
When validating using a accelerator that splits data from each batch across GPUs, sometimes you might need to aggregate them on the master GPU for processing (dp, or ddp2).
And here is accompanying code (validation_epoch_end
would receive accumulated data across multiple GPUs from single step in this case, also see the comments):
# Done per-process (GPU)
def validation_step(self, batch, batch_idx):
x, y = batch
y_hat = self.model(x)
loss = F.cross_entropy(y_hat, y)
pred = ...
return {'loss': loss, 'pred': pred}
# Gathered data from all processes (per single step)
# Allows for accumulation so the whole data at the end of epoch
# takes less memory
def validation_step_end(self, batch_parts):
gpu_0_prediction = batch_parts.pred[0]['pred']
gpu_1_prediction = batch_parts.pred[1]['pred']
# do something with both outputs
return (batch_parts[0]['loss'] + batch_parts[1]['loss']) / 2
def validation_epoch_end(self, validation_step_outputs):
for out in validation_step_outputs:
# do something with preds
Focus on per-device calculations and as small number of between-GPU transfers as possible
validation_step
(or training_step
if that's what you want, this is general) calculate f1
, precision
, recall
and whatever else on a per-batch basis
3
numbers from each device instead of (batch, outputs)
(which could be significantly larger)validation_step_end
get those 3
values (actually (2, 3)
if you have 2 GPUs) and sum/take mean of them and return 3
valuesvalidation_epoch_end
will get (steps, 3)
values that you can use to accumulateIt would be even better if instead of operating on list of values during validation_epoch_end
you could accumulate them in another 3
values (say you have a lot of validation steps, the list could grow too large), but this should be enough.
AFAIK PyTorch-Lightning doesn't do this (e.g. instead of adding to list
, apply some accumulator directly), but I might be mistaken, so any correction would be great.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With