Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Sharing a counter with multiprocessing.Pool

I'd like to use multiprocessing.Value + multiprocessing.Lock to share a counter between separate processes. For example:

import itertools as it
import multiprocessing

def func(x, val, lock):
    for i in range(x):
        i ** 2
    with lock:
        val.value += 1
        print('counter incremented to:', val.value)

if __name__ == '__main__':
    v = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    with multiprocessing.Pool() as pool:
        pool.starmap(func, ((i, v, lock) for i in range(25)))

This will throw the following exception:

RuntimeError: Synchronized objects should only be shared between processes through inheritance

What I am most confused by is that a related (albeit not completely analogous) pattern works with multiprocessing.Process():

if __name__ == '__main__':
    v = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    procs = [multiprocessing.Process(target=func, args=(i, v, lock))
             for i in range(25)]
    for p in procs: p.start()
    for p in procs: p.join()

Now, I recognize that these are two different markedly things:

  • the first example uses a number of worker processes equal to cpu_count(), and splits an iterable range(25) between them
  • the second example creates 25 worker processes and tasks each with one input

That said: how can I share an instance with pool.starmap() (or pool.map()) in this manner?

I've seen similar questions here, here, and here, but those approaches doesn't seem to be suited to .map()/.starmap(), regarldess of whether Value uses ctypes.c_int.

I realize that this approach technically works:

def func(x):
    for i in range(x):
        i ** 2
    with lock:
        v.value += 1
        print('counter incremented to:', v.value)

v = None
lock = None

def set_global_counter_and_lock():
    """Egh ... """
    global v, lock
    if not any((v, lock)):
        v = multiprocessing.Value('i', 0)
        lock = multiprocessing.Lock()

if __name__ == '__main__':
    # Each worker process will call `initializer()` when it starts.
    with multiprocessing.Pool(initializer=set_global_counter_and_lock) as pool:
        pool.map(func, range(25))

Is this really the best-practices way of going about this?

like image 238
Brad Solomon Avatar asked Dec 04 '18 16:12

Brad Solomon

People also ask

When would you use a multiprocessing pool?

Understand multiprocessing in no more than 6 minutes Multiprocessing is quintessential when a long-running process has to be speeded up or multiple processes have to execute parallelly. Executing a process on a single core confines its capability, which could otherwise spread its tentacles across multiple cores.

What is multiprocess synchronization?

Multiprocessor system facilitates parallel program execution and read/write sharing of data and thus may cause the processors to concurrently access location in the shared memory. Therefore, a correct and reliable mechanism is needed to serialize this access.

Does multiprocessing shared memory?

Shared memory : multiprocessing module provides Array and Value objects to share data between processes. Array: a ctypes array allocated from shared memory. Value: a ctypes object allocated from shared memory.

How do processes pools work in multiprocessing?

Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.

How many processes share a counter?

This code is a demonstration of the problem, distilling only the usage of the shared counter. A "pool" of 10 processes is created to run the func function. All processes share a Value and increment it 50 times.

What is Multiprocessing pool in Java?

multiprocessing.pool objects have internal resources that need to be properly managed (like any other resource) by using the pool as a context manager or by calling close () and terminate () manually. Failure to do this can lead to the process hanging on finalization.

How do I share data between processes in the multiprocessing module?

One of the methods of exchanging data between processes with the multiprocessing module is directly shared memory via multiprocessing.Value. As any method that's very general, it can sometimes be tricky to use. I've seen a variation of this question asked a couple of times on StackOverflow:

How do I create a Shared Queue in multiprocessing?

Note that one can also create a shared queue by using a manager object – see Managers. multiprocessing uses the usual queue.Empty and queue.Full exceptions to signal a timeout. They are not available in the multiprocessing namespace so you need to import them from queue.

1 Answers

The RuntimeError you get when using Pool is because arguments for pool-methods are pickled before being send over a (pool-internal) queue to the worker processes. Which pool-method you are trying to use is irrelevant here. This doesn't happen when you just use Process because there is no queue involved. You can reproduce the error just with pickle.dumps(multiprocessing.Value('i', 0)).

Your last code snippet doesn't work how you think it works. You are not sharing a Value, you are recreating independent counters for every child process.

In case you were on Unix and used the default start-method "fork", you would be done with just not passing the shared objects as arguments into the pool-methods. Your child-processes would inherit the globals through forking. With process-start-methods "spawn" (default Windows and macOS with Python 3.8+) or "forkserver", you'll have to use the initializer during Pool instantiation, to let the child-processes inherit the shared objects.

Note, you don't need an extra multiprocessing.Lock here, because multiprocessing.Value comes by default with an internal one you can use.

import os
from multiprocessing import Pool, Value #, set_start_method

def func(x):
    for i in range(x):
        assert i == i
        with cnt.get_lock():
            cnt.value += 1
            print(f'{os.getpid()} | counter incremented to: {cnt.value}\n')

def init_globals(counter):
    global cnt
    cnt = counter

if __name__ == '__main__':

    # set_start_method('spawn')

    cnt = Value('i', 0)
    iterable = [10000 for _ in range(10)]

    with Pool(initializer=init_globals, initargs=(cnt,)) as pool:
        pool.map(func, iterable)

    assert cnt.value == 100000

Probably worth noting as well is that you don't need the counter to be shared in all cases. If you just need to keep track of how often something has happened in total, an option would be to keep separate worker-local counters during computation which you sum up at the end. This could result in a significant performance improvement for frequent counter updates for which you don't need synchronization during the parallel computation itself.

like image 114
Darkonaut Avatar answered Oct 17 '22 01:10
