Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to parallelize a training loop ever samples of a batch when CPU is only available in pytorch?

I want to parallelize over single examples or batch of example (in my situation is that I only have cpus, I have up to 112). I tried it but I get a bug that the losses cannot have the gradient out of separate processes (which entirely ruins my attempt). I still want to do it and it essential that after the multiproessing happens that I can do an optimizer step. How do I get around it? I made a totally self contained example:


import torch
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset, DataLoader

from torch.multiprocessing import Pool

class SimpleDataSet(Dataset):

    def __init__(self, Din, num_examples=23):
        self.x_dataset = [torch.randn(Din) for _ in range(num_examples)]
        # target function is x*x
        self.y_dataset = [x**2 for x in self.x_dataset]

    def __len__(self):
        return len(self.x_dataset)

    def __getitem__(self, idx):
        return self.x_dataset[idx], self.y_dataset[idx]

def get_loss(args):
    x, y, model = args
    y_pred = model(x)
    criterion = nn.MSELoss()
    loss = criterion(y_pred, y)
    return loss

def get_dataloader(D, num_workers, batch_size):
    ds = SimpleDataSet(D)
    dl = DataLoader(ds, batch_size=batch_size, num_workers=num_workers)
    return dl

def train_fake_data():
    num_workers = 2
    Din, Dout = 3, 1
    model = nn.Linear(Din, Dout).share_memory()

    optimizer = torch.optim.Adam(model.parameters(), lr=0.1)

    batch_size = 2
    num_epochs = 10
    # num_batches = 5
    num_procs = 5
    dataloader = get_dataloader(Din, num_workers, batch_size)
    scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
    for epoch in range(num_epochs):
        for _, batch in enumerate(dataloader):
            batch = [(torch.randn(Din), torch.randn(Dout), model) for _ in batch]
            with Pool(num_procs) as pool:
                optimizer.zero_grad()

                losses = pool.map(get_loss, batch)
                loss = torch.mean(losses)
                loss.backward()

                optimizer.step()
            # scheduler
            scheduler.step()


if __name__ == '__main__':
    # start = time.time()
    # train()
    train_fake_data()
    # print(f'execution time: {time.time() - start}')

Error:

Traceback (most recent call last):
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3427, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-2-ea57e03ba088>", line 1, in <module>
    runfile('/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py', wdir='/Users/brando/ML4Coq/playground/multiprocessing_playground')
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 95, in <module>
    train_fake_data()
  File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 83, in train_fake_data
    losses = pool.map(get_loss, batch)
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 290, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 683, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '[tensor(0.5237, grad_fn=<MseLossBackward>)]'. Reason: 'RuntimeError('Cowardly refusing to serialize non-leaf tensor which requires_grad, since autograd does not support crossing process boundaries.  If you just want to transfer the data, call detach() on the tensor before serializing (e.g., putting it on the queue).')'

I am sure I want to do this. How should I be doing this?


New attempt using DDP

"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

import os

num_epochs = 5
batch_size = 8
Din, Dout = 10, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_epochs)]

class OneDeviceModel(nn.Module):
    """
    Toy example for a model ran in parallel but not distributed accross gpus
    (only processes with their own gpu or hardware)
    """
    def __init__(self):
        super().__init__()
        self.net1 = nn.Linear(Din, Din)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(Din, Dout)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

def setup_process(rank, world_size, backend='gloo'):
    """
    Initialize the distributed environment (for each process).

    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
    it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
    """
    # set up the master's ip address so this child process can coordinate
    # os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
    if torch.cuda.is_available():
        backend = 'nccl'
    # Initializes the default distributed process group, and this will also initialize the distributed package.
    dist.init_process_group(backend, rank=rank, world_size=world_size)

def cleanup():
    """ Destroy a given process group, and deinitialize the distributed package """
    dist.destroy_process_group()

def run_parallel_training_loop(rank, world_size):
    """
    Distributed function to be implemented later.

    This is the function that is actually ran in each distributed process.

    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
    you don’t need to worry about different DDP processes start from different model parameter initial values.
    """
    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    setup_process(rank, world_size)

    # create model and move it to GPU with id rank
    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
    # ddp_model = DDP(model, device_ids=[rank])
    ddp_model = DDP(model)

    for batch_idx, batch in enumerate(data):
        x, y = batch
        loss_fn = nn.MSELoss()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

        optimizer.zero_grad()
        outputs = ddp_model(x)
        labels = y.to(rank) if torch.cuda.is_available() else y
        # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
        loss_fn(outputs, labels).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.
        optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?

    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # Destroy a given process group, and deinitialize the distributed package
    cleanup()

def main():
    print()
    print('running main()')
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # args
    world_size = mp.cpu_count()
    mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)

if __name__ == "__main__":
    print('starting __main__')
    main()
    print('Done!\a\n')

it seems it works but my question is in line 74 do I need to do this

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()

or

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel()

for it to work properly in multiple CPUs?


Serial is faster than parallel even if I have 112 cpu cores?

My current issue is that the cpu parallel job is slower than the serially running one when only cpus are available.


I want to know how to set up python and parallel cpus. e.g. if I have X cpus how many processes should I be running...X? or what? How do I choose this number, even if its heursitics rough.


related links from research:

  • https://discuss.pytorch.org/t/multiprocessing-for-loop-on-cpu/59836
  • How to use multiprocessing in PyTorch?
  • https://discuss.pytorch.org/t/how-to-parallelize-a-loop-over-the-samples-of-a-batch/32698/7
  • https://www.reddit.com/r/pytorch/comments/sm073v/how_to_parallelize_a_training_loop_ever_samples/
like image 786
Charlie Parker Avatar asked Feb 16 '21 14:02

Charlie Parker


People also ask

Is it possible to parallelize Python for loops?

Python does not have true parallelism within any given process. You would have to spawn a ProcessPool and make the inside of your loop a function taking batch_index, mask_batch, then map that function over the mask object in your current for loop. Thing is, I don't know if PyTorch will play nicely with this.

How do PyTorch training loops work?

Finally, we’ll pull all of these together and see a full PyTorch training loop in action. The Dataset and DataLoader classes encapsulate the process of pulling your data from storage and exposing it to your training loop in batches. The Dataset is responsible for accessing and processing single instances of data.

How does PyTorch exploit GPU parallelism?

We have already described how PyTorch exploits GPU parallelism. If a server has multiple cores or if you have a cluster of servers available, it is also possible to use MPI-like message passing style to coordinate multiple thread of computation. One must create a master process that forks off child processes that do the work.

What is a PyTorch optimizer?

We’ll get familiar with the dataset and dataloader abstractions, and how they ease the process of feeding data to your model during a training loop We’ll look at PyTorch optimizers, which implement algorithms to adjust model weights based on the outcome of a loss function


1 Answers

Torch will use multiple CPU to parallelize operations, so your serial is maybe using multi-core vectorization.

Take this simple example

import torch
c = 0;
for i in range(10000):
    A = torch.randn(1000, 1000, device='cpu');
    B = torch.randn(1000, 1000, device='cpu');
    c += torch.sum(A @ B)

No code is used to parallelize, however 80% of 12 CPUs with the default configuration.

enter image description here

You can use torch.set_num_threads to set intraop parallelism on CPU. In particular if you are running multiple process and you want each process to use a single CPU you may want to set in each process the intraop parallelism to 1.

However, parallelizing the operations has a cost, I am unable go into the implementation details but we can run a quick experiment that shows the overhead of using multiple threads.

import matplotlib.pyplot as plt
import numpy as np
import torch;
import time;
A = torch.randn(1000, 1000, device='cpu');
B = torch.randn(1000, 1000, device='cpu');
funcs = {
    'sin': lambda a,b: torch.sin(A),
    'tanh': lambda a,b: torch.tanh(A),
    'log': lambda a,b: torch.log(A),
    'matmul': lambda a,b: A @ B.T
}
t = np.zeros(20)
for k,f in funcs.items():
    for i in range(1, len(t) + 1):
        torch.set_num_threads(i)
        c = 0;
        t0 = time.time();
        for _ in range(100):
            f(A,B)
        tf = time.time()
        t[i-1] = (tf - t0)*i;
    plt.plot(np.arange(1, len(t)+1), t, '-o', label=k)
plt.xlabel('Number of threads')
plt.legend()
plt.ylabel('Core x time')

The operations tends to run faster with parallelism enter image description here

But if we take the total CPU time, by multiplying by the number of threads, we see that the single thread version is more efficient.

enter image description here

If you are able to parallelize your experiment at a higher level, by running independent processes, you should try that with a single core for each process, otherwise each process will try to use all the CPUs and all of them will run very slowly because your system is overloaded.

Tweaking DDP example

I modified hyperparameters of your example scripts intentionally in a way that weights in favor of multi process.

  • comparably less initialization overhead
  • comparably less communication between processes
"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import argparse
import os

# More than one epoch so that the initialization is less significant
# than compared to the model processing time
num_epochs = 10
# for the experiment select a number that has a lot of divisors
# as I want to test with equal number of batches
num_batches = 16*9*5
# Uses a larger batch so that more work is done in each process
# between two gradient synchronizations
# apparently the intraop optimization is not helping 
# (at least not too much) in the batch dimension
batch_size = 10000
# Use smaller dimensions, so that the intraop parallelization becomes less 
# helpful
Din, Dout = 3, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_batches)]

class OneDeviceModel(nn.Module):
    """
    Toy example for a model ran in parallel but not distributed accross gpus
    (only processes with their own gpu or hardware)
    """
    def __init__(self):
        super().__init__()
        # -- Use more layers
        self.net = [nn.Linear(Din, Din) for _ in range(10)]
        # -- Bob: use more complex activation  
        self.tanh = nn.Tanh()
        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(Din, Dout)

    def forward(self, x):
      # apply the 10 layers sequentially
      for i in range(10):
        x = self.net[i](x)
        x = self.sigmoid(x)
        x = self.tanh(x)
        x = self.relu(x)
      return self.net2(x)

def setup_process(rank, world_size, backend='gloo'):
    """
    Initialize the distributed environment (for each process).

    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
    it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
    """
    # set up the master's ip address so this child process can coordinate
    # os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
    if torch.cuda.is_available():
        backend = 'nccl'
    # Initializes the default distributed process group, and this will also initialize the distributed package.
    dist.init_process_group(backend, rank=rank, world_size=world_size)

def cleanup():
    """ Destroy a given process group, and deinitialize the distributed package """
    dist.destroy_process_group()

def run_parallel_training_loop(rank, world_size):
    """
    Distributed function to be implemented later.

    This is the function that is actually ran in each distributed process.

    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
    you don’t need to worry about different DDP processes start from different model parameter initial values.
    """
    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    setup_process(rank, world_size)
    torch.set_num_threads(mp.cpu_count() // world_size)
    # create model and move it to GPU with id rank
    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
    # ddp_model = DDP(model, device_ids=[rank])
    ddp_model = DDP(model)
    for _ in range(num_epochs):
      for batch_idx, batch in enumerate(data[rank::world_size]):
          x, y = batch
          loss_fn = nn.MSELoss()
          optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

          optimizer.zero_grad()
          outputs = ddp_model(x)
          labels = y.to(rank) if torch.cuda.is_available() else y
          # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
          loss_fn(outputs, labels).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.
          optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?

    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # Destroy a given process group, and deinitialize the distributed package
    cleanup()

def main():
    print()
    print('running main()')
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    parser = argparse.ArgumentParser()
    parser.add_argument('--world-size', default=1, type=int)
    args = parser.parse_args()
    assert num_batches % args.world_size == 0
    mp.spawn(run_parallel_training_loop, args=(args.world_size,), nprocs=args.world_size)

if __name__ == "__main__":
    print('starting __main__')
    main()
    print('Done!\a\n')
$ time python3 ddp.py --world-size 1 > /dev/null

real    0m59.092s
user    8m46.589s
sys     0m7.320s

$ time python3 ddp.py --world-size 1 > /dev/null

real    1m11.124s
user    10m54.209s
sys     0m9.595s

$ time python3 ddp.py --world-size 6 > /dev/null

real    0m18.348s
user    2m28.799s
sys     0m18.068s
$ time python3 ddp.py --world-size 12 > /dev/null

real    0m26.352s
user    4m3.074s
sys     0m39.179s
$ time python3 ddp.py --world-size 3 > /dev/null

real    0m23.047s
user    3m51.172s
sys     0m11.483s
$ time python3 ddp.py --world-size 4 > /dev/null

real    0m18.195s
user    2m55.241s
sys     0m12.841s
$ time python3 ddp.py --world-size 2 > /dev/null

real    0m26.955s
user    4m15.837s
sys     0m7.127s

If I remove the line

torch.set_num_threads(mp.cpu_count() // world_size)
$ time python3 ddp.py --world-size 4 > /dev/null

real    0m40.574s
user    6m39.176s
sys     0m19.025s

$ time python3 ddp.py --world-size 2 > /dev/null

real    0m28.066s
user    3m17.775s
sys     0m8.410s

$ time python3 ddp.py --world-size 1 > /dev/null

real    0m37.114s
user    2m19.743s
sys     0m4.866s

Using

torch.set_num_threads(mp.cpu_count() // world_size // 2)
$ time python3 ddp.py --world-size 6 > /dev/null

real    0m16.399s
user    1m38.915s
sys     0m20.780s

$ time python3 ddp.py --world-size 4 > /dev/null

real    0m15.649s
user    1m1.821s
sys     0m13.589s

$ time python3 ddp.py --world-size 3 > /dev/null

real    0m16.947s
user    1m29.696s
sys     0m10.069s

$ time python3 ddp.py --world-size 2 > /dev/null

real    0m21.851s
user    2m4.564s
sys     0m7.486s

My Opinion

DDP in a single node seems not particularly advantageous. Unless you have a model that does a lot of work that is particularly not well handled by pytorch intraop parallelism, have large batches, and preferrably models with less parameters and more operations, meaning less gradients to synchronize, e.g. a convolutional model on a very large input.

Other scenarios where DDP might be helpful is if you are using too much python in your model, instead of vectorized operations.

like image 98
Bob Avatar answered Oct 25 '22 17:10

Bob