Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Where should I allocate the pool in Python multiprocessing?

I want to distribute a Python application over several kernels, and from the documentation, I understand that a pool is the way to do this.

My problem can be reproduced by the following code:

#!/usr/bin/python3
import multiprocessing as mp
import multiprocessing.managers
import time

def poolswimmer():
    for i in range(5):
        print(i)
        time.sleep(1)

def poolprocess(_pool):
    res = _pool.apply_async(poolswimmer)
    res.wait()

if __name__ == "__main__":
    mp.freeze_support()
    mpc = mp.get_context('spawn')
    manager = mpc.Manager()
    pool = manager.Pool()
    p = mp.Process(target=poolprocess, args=(pool,))
    p.start()
    p.join()
    pool.close()

The <poolprocess> is the I/O intensive part and <poolswimmer> is the CPU intensive part of a job which is executed in loops for different peripherals.

I get the following error:

Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/dad/sandbox/ImageProcessing/poolrider.py", line 12, in poolprocess
    res = _pool.apply_async(poolswimmer)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 2, in apply_async
  File "/usr/lib/python3.12/multiprocessing/managers.py", line 827, in _callmethod
    proxytype = self._manager._registry[token.typeid][-1]
                ^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute '_registry'

The object inspector tells me that my pool variable is an instance of a PoolProxy object, so it should be OK to call it from the process. Where am I wrong? Is there a better way to achieve what I'm trying to do?

like image 716
tommiport5 Avatar asked Feb 18 '26 11:02

tommiport5


1 Answers

There are several cases and it's not clear from your post which one is applicable. So let's take each case in turn:

Case 1

For a given task your I/O-bound processing time is is greater than the CPU-bound time and the I/O-bound processing does not need a result from the CPU-bound processing. In this case, I will assume that the I/O-bound processing is not greatly affected by the existence of the GIL and thus can be performed concurrently using a multithreading pool.

30 tasks where for each task the I/O-bound processing takes 1 second and the CPU-bound processing takes .1 second:

#!/usr/bin/python3
from multiprocessing.pool import Pool as ProcessPool
from multiprocessing.pool import ThreadPool as ThreadPool
import multiprocessing as mp
from functools import partial
import time

def poolswimmer(arg):
    # emulate doing CPU bound processing:
    time.sleep(.1)

    result = sum(range(arg))
    print(f'Argument {arg} handled by process {mp.current_process().pid}', flush=True)

def threaded_worker(process_pool, arg):
    # Enulate I/O bound processing:
    time.sleep(1)

    # Do CPU bound processing asynchronously:
    process_pool.apply_async(poolswimmer, args=(arg,))

if __name__ == "__main__":
    mp.set_start_method('spawn')

    args = list(range(0, 30))
    THREAD_POOL_SIZE = len(args)

    process_pool = ProcessPool(min(THREAD_POOL_SIZE, mp.cpu_count()))
    thread_pool = ThreadPool(THREAD_POOL_SIZE)
    t0 = time.monotonic()
    results = thread_pool.map(partial(threaded_worker, process_pool), args)
    thread_pool.close()
    thread_pool.join()
    process_pool.close()
    process_pool.join()
    elapsed = time.monotonic() - t0

    print(f'elapsed time = {elapsed} sec.')

Prints:

Argument 14 handled by process 2928
Argument 10 handled by process 12516
Argument 11 handled by process 23240
Argument 15 handled by process 1440
Argument 13 handled by process 22252
Argument 12 handled by process 22328
Argument 9 handled by process 12272
Argument 8 handled by process 4956
Argument 7 handled by process 2928
Argument 5 handled by process 12516
Argument 23 handled by process 23240
Argument 4 handled by process 1440
Argument 3 handled by process 22252
Argument 2 handled by process 22328
Argument 0 handled by process 12272
Argument 1 handled by process 4956
Argument 6 handled by process 2928
Argument 22 handled by process 12516
Argument 19 handled by process 1440
Argument 20 handled by process 23240
Argument 18 handled by process 22252
Argument 27 handled by process 22328
Argument 16 handled by process 12272
Argument 24 handled by process 4956
Argument 26 handled by process 2928
Argument 17 handled by process 12516
Argument 29 handled by process 23240
Argument 21 handled by process 1440
Argument 28 handled by process 22252
Argument 25 handled by process 22328
elapsed time = 1.4380000000237487 sec.

Case 2

Same as Case 1, except the I/O-bound processing needs to wait for the result of the CPU-bound processing.

#!/usr/bin/python3
from multiprocessing.pool import Pool as ProcessPool
from multiprocessing.pool import ThreadPool as ThreadPool
import multiprocessing as mp
from functools import partial
import time

def poolswimmer(arg):
    # emulate doing CPU bound processing:
    time.sleep(.1)

    result = sum(range(arg))
    print(f'Argument {arg} handled by process {mp.current_process().pid}', flush=True)
    return result

def threaded_worker(process_pool, arg):
    # Enulate I/O bound processing:
    time.sleep(1)

    # Do CPU bound processing synchronously:
    return process_pool.apply(poolswimmer, args=(arg,))

if __name__ == "__main__":
    mp.set_start_method('spawn')

    args = list(range(0, 30))
    THREAD_POOL_SIZE = len(args)

    process_pool = ProcessPool(min(THREAD_POOL_SIZE, mp.cpu_count()))
    thread_pool = ThreadPool(THREAD_POOL_SIZE)
    t0 = time.monotonic()
    results = thread_pool.map(partial(threaded_worker, process_pool), args)
    thread_pool.close()
    thread_pool.join()
    process_pool.close()
    process_pool.join()
    elapsed = time.monotonic() - t0

    print('results =', results)
    print(f'elapsed time = {elapsed} sec.')

Prints:

Argument 0 handled by process 14352
Argument 28 handled by process 23068
Argument 26 handled by process 14712
Argument 27 handled by process 13208
Argument 25 handled by process 16324
Argument 24 handled by process 17380
Argument 23 handled by process 8416
Argument 22 handled by process 14088
Argument 21 handled by process 14352
Argument 20 handled by process 23068
Argument 19 handled by process 16324
Argument 17 handled by process 13208
Argument 15 handled by process 17380
Argument 16 handled by process 8416
Argument 18 handled by process 14712
Argument 14 handled by process 14088
Argument 13 handled by process 14352
Argument 12 handled by process 23068
Argument 11 handled by process 16324
Argument 10 handled by process 13208
Argument 9 handled by process 17380
Argument 8 handled by process 8416
Argument 7 handled by process 14712
Argument 6 handled by process 14088
Argument 5 handled by process 14352
Argument 3 handled by process 16324
Argument 4 handled by process 23068
Argument 29 handled by process 8416
Argument 2 handled by process 13208
Argument 1 handled by process 17380
results = [0, 0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55, 66, 78, 91, 105, 120, 136, 153, 171, 190, 210, 231, 253, 276, 300, 325, 351, 378, 406]
elapsed time = 1.4849999999860302 sec.

As you can see, the Case 1 and Case 2 timings are essentially the same.

Case 3

Now for each task the CPU-bound processing takes 1 second and the I/O-bound processing take .1 second.

#!/usr/bin/python3
from multiprocessing.pool import Pool as ProcessPool
from multiprocessing.pool import ThreadPool as ThreadPool
import multiprocessing as mp
from functools import partial
import time

def poolswimmer(arg):
    # emulate doing CPU bound processing:
    time.sleep(1)

    result = sum(range(arg))
    print(f'Argument {arg} handled by process {mp.current_process().pid}', flush=True)
    return result

def threaded_worker(process_pool, arg):
    # Enulate I/O bound processing:
    time.sleep(.1)

    # Do CPU bound processing synchronously:
    return process_pool.apply(poolswimmer, args=(arg,))

if __name__ == "__main__":
    mp.set_start_method('spawn')

    args = list(range(0, 30))
    THREAD_POOL_SIZE = len(args)

    process_pool = ProcessPool(min(THREAD_POOL_SIZE, mp.cpu_count()))
    thread_pool = ThreadPool(THREAD_POOL_SIZE)
    t0 = time.monotonic()
    results = thread_pool.map(partial(threaded_worker, process_pool), args)
    thread_pool.close()
    thread_pool.join()
    process_pool.close()
    process_pool.join()
    elapsed = time.monotonic() - t0

    print('results =', results)
    print(f'elapsed time = {elapsed} sec.')

Prints:

Argument 1 handled by process 12068
Argument 20 handled by process 15380
Argument 18 handled by process 11868
Argument 0 handled by process 17728
Argument 21 handled by process 21876
Argument 16 handled by process 22400
Argument 19 handled by process 16580
Argument 23 handled by process 21084
Argument 12 handled by process 12068
Argument 17 handled by process 15380
Argument 8 handled by process 11868
Argument 15 handled by process 17728
Argument 14 handled by process 21876
Argument 9 handled by process 22400
Argument 5 handled by process 16580
Argument 13 handled by process 21084
Argument 22 handled by process 12068
Argument 4 handled by process 15380
Argument 29 handled by process 11868
Argument 6 handled by process 17728
Argument 7 handled by process 21876
Argument 10 handled by process 22400
Argument 3 handled by process 16580
Argument 2 handled by process 21084
Argument 25 handled by process 12068
Argument 27 handled by process 15380
Argument 28 handled by process 11868
Argument 11 handled by process 21876
Argument 24 handled by process 17728
Argument 26 handled by process 22400
results = [0, 0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55, 66, 78, 91, 105, 120, 136, 153, 171, 190, 210, 231, 253, 276, 300, 325, 351, 378, 406]
elapsed time = 4.172000000020489 sec.

Case 4

This is the same as Case 3, but now we just use a multiprocessing pool to do both the I/O and CPU-bound processing.

#!/usr/bin/python3
from multiprocessing.pool import Pool as ProcessPool
from multiprocessing.pool import ThreadPool as ThreadPool
import multiprocessing as mp
import time

def poolswimmer(arg):
    # emulate doing CPU bound processing:
    time.sleep(1)

    result = sum(range(arg))
    print(f'Argument {arg} handled by process {mp.current_process().pid}', flush=True)
    return result

def pool_worker(arg):
    # Enulate I/O bound processing:
    time.sleep(.1)

    # Do CPU bound processing synchronously:
    return poolswimmer(arg)

if __name__ == "__main__":
    mp.set_start_method('spawn')

    args = list(range(0, 30))

    process_pool = ProcessPool(min(len(args), mp.cpu_count()))
    t0 = time.monotonic()
    results = process_pool.map(pool_worker, args)
    process_pool.close()
    process_pool.join()
    elapsed = time.monotonic() - t0

    print('results =', results)
    print(f'elapsed time = {elapsed} sec.')

Prints:

Argument 0 handled by process 6484
Argument 1 handled by process 11248
Argument 2 handled by process 1752
Argument 3 handled by process 16428
Argument 4 handled by process 16056
Argument 5 handled by process 12944
Argument 6 handled by process 16392
Argument 7 handled by process 17788
Argument 8 handled by process 6484
Argument 9 handled by process 11248
Argument 10 handled by process 1752
Argument 11 handled by process 16428
Argument 12 handled by process 16056
Argument 13 handled by process 12944
Argument 14 handled by process 16392
Argument 15 handled by process 17788
Argument 16 handled by process 6484
Argument 17 handled by process 11248
Argument 18 handled by process 1752
Argument 19 handled by process 16428
Argument 21 handled by process 12944
Argument 20 handled by process 16056
Argument 22 handled by process 16392
Argument 23 handled by process 17788
Argument 24 handled by process 6484
Argument 25 handled by process 11248
Argument 26 handled by process 1752
Argument 27 handled by process 16428
Argument 28 handled by process 12944
Argument 29 handled by process 16056
results = [0, 0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55, 66, 78, 91, 105, 120, 136, 153, 171, 190, 210, 231, 253, 276, 300, 325, 351, 378, 406]
elapsed time = 4.625 sec.

Case 5

Here we are using a single multiprocessing pool and the I/O-bound processing time is 1 second and the CPU-bound processing time is .1 seconds. Since the processes will be doing mostly I/O-bound processing, we do not limit its pool size to the number of processors available.

#!/usr/bin/python3
from multiprocessing.pool import Pool as ProcessPool
from multiprocessing.pool import ThreadPool as ThreadPool
import multiprocessing as mp
import time

def poolswimmer(arg):
    # emulate doing CPU bound processing:
    time.sleep(.1)

    result = sum(range(arg))
    print(f'Argument {arg} handled by process {mp.current_process().pid}', flush=True)
    return result

def pool_worker(arg):
    # Enulate I/O bound processing:
    time.sleep(1)

    # Do CPU bound processing synchronously:
    return poolswimmer(arg)

if __name__ == "__main__":
    mp.set_start_method('spawn')

    args = list(range(0, 30))

    process_pool = ProcessPool(len(args))
    t0 = time.monotonic()
    results = process_pool.map(pool_worker, args)
    process_pool.close()
    process_pool.join()
    elapsed = time.monotonic() - t0

    print('results =', results)
    print(f'elapsed time = {elapsed} sec.')

Prints:

Argument 0 handled by process 21352
Argument 1 handled by process 21244
Argument 2 handled by process 10672
Argument 3 handled by process 10872
Argument 4 handled by process 20632
Argument 5 handled by process 22264
Argument 6 handled by process 7112
Argument 7 handled by process 12484
Argument 8 handled by process 22496
Argument 9 handled by process 4084
Argument 10 handled by process 18360
Argument 11 handled by process 6316
Argument 12 handled by process 11524
Argument 13 handled by process 8784
Argument 14 handled by process 17504
Argument 15 handled by process 18168
Argument 16 handled by process 4960
Argument 17 handled by process 23100
Argument 18 handled by process 23188
Argument 19 handled by process 13984
Argument 20 handled by process 16776
Argument 21 handled by process 13148
Argument 23 handled by process 20968
Argument 22 handled by process 11500
Argument 25 handled by process 20344
Argument 24 handled by process 18092
Argument 26 handled by process 18564
Argument 27 handled by process 17388
Argument 28 handled by process 21768
Argument 29 handled by process 18752
results = [0, 0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55, 66, 78, 91, 105, 120, 136, 153, 171, 190, 210, 231, 253, 276, 300, 325, 351, 378, 406]
elapsed time = 1.8589999999967404 sec

The elapsed time of ~1.86 seconds should be compared with the elapsed time of ~1.48 seconds from Case 2, which uses two types of pools.

Conclusion

When the I/O-bound processing time is much less than the CPU-processing time, using just a multiprocessing pool is almost but not quite as efficient as using two types of pools. The problem is that the largest multiprocessing pool size you can create on your platform may be considerably less than the number of tasks you need to process.

Using both a multiprocessing pool (for the CPU-bound processing) and a multi-threading pool (for the I/O-bound processing) seems to offer the best performance regardless of what the CPU-bound and I/O-bound processing times are.

like image 107
Booboo Avatar answered Feb 20 '26 01:02

Booboo