I want to parallelize over single examples or batch of example (in my situation is that I only have cpus, I have up to 112). I tried it but I get a bug that the losses cannot have the gradient out of separate processes (which entirely ruins my attempt). I still want to do it and it essential that after the multiproessing happens that I can do an optimizer step. How do I get around it? I made a totally self contained example:
import torch
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset, DataLoader
from torch.multiprocessing import Pool
class SimpleDataSet(Dataset):
def __init__(self, Din, num_examples=23):
self.x_dataset = [torch.randn(Din) for _ in range(num_examples)]
# target function is x*x
self.y_dataset = [x**2 for x in self.x_dataset]
def __len__(self):
return len(self.x_dataset)
def __getitem__(self, idx):
return self.x_dataset[idx], self.y_dataset[idx]
def get_loss(args):
x, y, model = args
y_pred = model(x)
criterion = nn.MSELoss()
loss = criterion(y_pred, y)
return loss
def get_dataloader(D, num_workers, batch_size):
ds = SimpleDataSet(D)
dl = DataLoader(ds, batch_size=batch_size, num_workers=num_workers)
return dl
def train_fake_data():
num_workers = 2
Din, Dout = 3, 1
model = nn.Linear(Din, Dout).share_memory()
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)
batch_size = 2
num_epochs = 10
# num_batches = 5
num_procs = 5
dataloader = get_dataloader(Din, num_workers, batch_size)
scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
for epoch in range(num_epochs):
for _, batch in enumerate(dataloader):
batch = [(torch.randn(Din), torch.randn(Dout), model) for _ in batch]
with Pool(num_procs) as pool:
optimizer.zero_grad()
losses = pool.map(get_loss, batch)
loss = torch.mean(losses)
loss.backward()
optimizer.step()
# scheduler
scheduler.step()
if __name__ == '__main__':
# start = time.time()
# train()
train_fake_data()
# print(f'execution time: {time.time() - start}')
Error:
Traceback (most recent call last):
File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3427, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-2-ea57e03ba088>", line 1, in <module>
runfile('/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py', wdir='/Users/brando/ML4Coq/playground/multiprocessing_playground')
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 95, in <module>
train_fake_data()
File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 83, in train_fake_data
losses = pool.map(get_loss, batch)
File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 290, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 683, in get
raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '[tensor(0.5237, grad_fn=<MseLossBackward>)]'. Reason: 'RuntimeError('Cowardly refusing to serialize non-leaf tensor which requires_grad, since autograd does not support crossing process boundaries. If you just want to transfer the data, call detach() on the tensor before serializing (e.g., putting it on the queue).')'
I am sure I want to do this. How should I be doing this?
"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import os
num_epochs = 5
batch_size = 8
Din, Dout = 10, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_epochs)]
class OneDeviceModel(nn.Module):
"""
Toy example for a model ran in parallel but not distributed accross gpus
(only processes with their own gpu or hardware)
"""
def __init__(self):
super().__init__()
self.net1 = nn.Linear(Din, Din)
self.relu = nn.ReLU()
self.net2 = nn.Linear(Din, Dout)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def setup_process(rank, world_size, backend='gloo'):
"""
Initialize the distributed environment (for each process).
gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
"""
# set up the master's ip address so this child process can coordinate
# os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
if torch.cuda.is_available():
backend = 'nccl'
# Initializes the default distributed process group, and this will also initialize the distributed package.
dist.init_process_group(backend, rank=rank, world_size=world_size)
def cleanup():
""" Destroy a given process group, and deinitialize the distributed package """
dist.destroy_process_group()
def run_parallel_training_loop(rank, world_size):
"""
Distributed function to be implemented later.
This is the function that is actually ran in each distributed process.
Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
you don’t need to worry about different DDP processes start from different model parameter initial values.
"""
print()
print(f"Start running DDP with model parallel example on rank: {rank}.")
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
setup_process(rank, world_size)
# create model and move it to GPU with id rank
model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
# ddp_model = DDP(model, device_ids=[rank])
ddp_model = DDP(model)
for batch_idx, batch in enumerate(data):
x, y = batch
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(x)
labels = y.to(rank) if torch.cuda.is_available() else y
# Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
loss_fn(outputs, labels).backward() # When the backward() returns, param.grad already contains the synchronized gradient tensor.
optimizer.step() # TODO how does the optimizer know to do the gradient step only once?
print()
print(f"Start running DDP with model parallel example on rank: {rank}.")
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
# Destroy a given process group, and deinitialize the distributed package
cleanup()
def main():
print()
print('running main()')
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
# args
world_size = mp.cpu_count()
mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)
if __name__ == "__main__":
print('starting __main__')
main()
print('Done!\a\n')
it seems it works but my question is in line 74 do I need to do this
model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
or
model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel()
for it to work properly in multiple CPUs?
My current issue is that the cpu parallel job is slower than the serially running one when only cpus are available.
I want to know how to set up python and parallel cpus. e.g. if I have X cpus how many processes should I be running...X? or what? How do I choose this number, even if its heursitics rough.
related links from research:
Python does not have true parallelism within any given process. You would have to spawn a ProcessPool and make the inside of your loop a function taking batch_index, mask_batch, then map that function over the mask object in your current for loop. Thing is, I don't know if PyTorch will play nicely with this.
Finally, we’ll pull all of these together and see a full PyTorch training loop in action. The Dataset and DataLoader classes encapsulate the process of pulling your data from storage and exposing it to your training loop in batches. The Dataset is responsible for accessing and processing single instances of data.
We have already described how PyTorch exploits GPU parallelism. If a server has multiple cores or if you have a cluster of servers available, it is also possible to use MPI-like message passing style to coordinate multiple thread of computation. One must create a master process that forks off child processes that do the work.
We’ll get familiar with the dataset and dataloader abstractions, and how they ease the process of feeding data to your model during a training loop We’ll look at PyTorch optimizers, which implement algorithms to adjust model weights based on the outcome of a loss function
Torch will use multiple CPU to parallelize operations, so your serial is maybe using multi-core vectorization.
Take this simple example
import torch
c = 0;
for i in range(10000):
A = torch.randn(1000, 1000, device='cpu');
B = torch.randn(1000, 1000, device='cpu');
c += torch.sum(A @ B)
No code is used to parallelize, however 80% of 12 CPUs with the default configuration.
You can use torch.set_num_threads to set intraop parallelism on CPU. In particular if you are running multiple process and you want each process to use a single CPU you may want to set in each process the intraop parallelism to 1
.
However, parallelizing the operations has a cost, I am unable go into the implementation details but we can run a quick experiment that shows the overhead of using multiple threads.
import matplotlib.pyplot as plt
import numpy as np
import torch;
import time;
A = torch.randn(1000, 1000, device='cpu');
B = torch.randn(1000, 1000, device='cpu');
funcs = {
'sin': lambda a,b: torch.sin(A),
'tanh': lambda a,b: torch.tanh(A),
'log': lambda a,b: torch.log(A),
'matmul': lambda a,b: A @ B.T
}
t = np.zeros(20)
for k,f in funcs.items():
for i in range(1, len(t) + 1):
torch.set_num_threads(i)
c = 0;
t0 = time.time();
for _ in range(100):
f(A,B)
tf = time.time()
t[i-1] = (tf - t0)*i;
plt.plot(np.arange(1, len(t)+1), t, '-o', label=k)
plt.xlabel('Number of threads')
plt.legend()
plt.ylabel('Core x time')
The operations tends to run faster with parallelism
But if we take the total CPU time, by multiplying by the number of threads, we see that the single thread version is more efficient.
If you are able to parallelize your experiment at a higher level, by running independent processes, you should try that with a single core for each process, otherwise each process will try to use all the CPUs and all of them will run very slowly because your system is overloaded.
I modified hyperparameters of your example scripts intentionally in a way that weights in favor of multi process.
"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import argparse
import os
# More than one epoch so that the initialization is less significant
# than compared to the model processing time
num_epochs = 10
# for the experiment select a number that has a lot of divisors
# as I want to test with equal number of batches
num_batches = 16*9*5
# Uses a larger batch so that more work is done in each process
# between two gradient synchronizations
# apparently the intraop optimization is not helping
# (at least not too much) in the batch dimension
batch_size = 10000
# Use smaller dimensions, so that the intraop parallelization becomes less
# helpful
Din, Dout = 3, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_batches)]
class OneDeviceModel(nn.Module):
"""
Toy example for a model ran in parallel but not distributed accross gpus
(only processes with their own gpu or hardware)
"""
def __init__(self):
super().__init__()
# -- Use more layers
self.net = [nn.Linear(Din, Din) for _ in range(10)]
# -- Bob: use more complex activation
self.tanh = nn.Tanh()
self.sigmoid = nn.Sigmoid()
self.relu = nn.ReLU()
self.net2 = nn.Linear(Din, Dout)
def forward(self, x):
# apply the 10 layers sequentially
for i in range(10):
x = self.net[i](x)
x = self.sigmoid(x)
x = self.tanh(x)
x = self.relu(x)
return self.net2(x)
def setup_process(rank, world_size, backend='gloo'):
"""
Initialize the distributed environment (for each process).
gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
"""
# set up the master's ip address so this child process can coordinate
# os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
if torch.cuda.is_available():
backend = 'nccl'
# Initializes the default distributed process group, and this will also initialize the distributed package.
dist.init_process_group(backend, rank=rank, world_size=world_size)
def cleanup():
""" Destroy a given process group, and deinitialize the distributed package """
dist.destroy_process_group()
def run_parallel_training_loop(rank, world_size):
"""
Distributed function to be implemented later.
This is the function that is actually ran in each distributed process.
Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
you don’t need to worry about different DDP processes start from different model parameter initial values.
"""
print()
print(f"Start running DDP with model parallel example on rank: {rank}.")
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
setup_process(rank, world_size)
torch.set_num_threads(mp.cpu_count() // world_size)
# create model and move it to GPU with id rank
model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
# ddp_model = DDP(model, device_ids=[rank])
ddp_model = DDP(model)
for _ in range(num_epochs):
for batch_idx, batch in enumerate(data[rank::world_size]):
x, y = batch
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(x)
labels = y.to(rank) if torch.cuda.is_available() else y
# Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
loss_fn(outputs, labels).backward() # When the backward() returns, param.grad already contains the synchronized gradient tensor.
optimizer.step() # TODO how does the optimizer know to do the gradient step only once?
print()
print(f"Start running DDP with model parallel example on rank: {rank}.")
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
# Destroy a given process group, and deinitialize the distributed package
cleanup()
def main():
print()
print('running main()')
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
parser = argparse.ArgumentParser()
parser.add_argument('--world-size', default=1, type=int)
args = parser.parse_args()
assert num_batches % args.world_size == 0
mp.spawn(run_parallel_training_loop, args=(args.world_size,), nprocs=args.world_size)
if __name__ == "__main__":
print('starting __main__')
main()
print('Done!\a\n')
$ time python3 ddp.py --world-size 1 > /dev/null
real 0m59.092s
user 8m46.589s
sys 0m7.320s
$ time python3 ddp.py --world-size 1 > /dev/null
real 1m11.124s
user 10m54.209s
sys 0m9.595s
$ time python3 ddp.py --world-size 6 > /dev/null
real 0m18.348s
user 2m28.799s
sys 0m18.068s
$ time python3 ddp.py --world-size 12 > /dev/null
real 0m26.352s
user 4m3.074s
sys 0m39.179s
$ time python3 ddp.py --world-size 3 > /dev/null
real 0m23.047s
user 3m51.172s
sys 0m11.483s
$ time python3 ddp.py --world-size 4 > /dev/null
real 0m18.195s
user 2m55.241s
sys 0m12.841s
$ time python3 ddp.py --world-size 2 > /dev/null
real 0m26.955s
user 4m15.837s
sys 0m7.127s
If I remove the line
torch.set_num_threads(mp.cpu_count() // world_size)
$ time python3 ddp.py --world-size 4 > /dev/null
real 0m40.574s
user 6m39.176s
sys 0m19.025s
$ time python3 ddp.py --world-size 2 > /dev/null
real 0m28.066s
user 3m17.775s
sys 0m8.410s
$ time python3 ddp.py --world-size 1 > /dev/null
real 0m37.114s
user 2m19.743s
sys 0m4.866s
Using
torch.set_num_threads(mp.cpu_count() // world_size // 2)
$ time python3 ddp.py --world-size 6 > /dev/null
real 0m16.399s
user 1m38.915s
sys 0m20.780s
$ time python3 ddp.py --world-size 4 > /dev/null
real 0m15.649s
user 1m1.821s
sys 0m13.589s
$ time python3 ddp.py --world-size 3 > /dev/null
real 0m16.947s
user 1m29.696s
sys 0m10.069s
$ time python3 ddp.py --world-size 2 > /dev/null
real 0m21.851s
user 2m4.564s
sys 0m7.486s
DDP in a single node seems not particularly advantageous. Unless you have a model that does a lot of work that is particularly not well handled by pytorch intraop parallelism, have large batches, and preferrably models with less parameters and more operations, meaning less gradients to synchronize, e.g. a convolutional model on a very large input.
Other scenarios where DDP might be helpful is if you are using too much python in your model, instead of vectorized operations.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With