Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to catch exceptions in workers in Multiprocessing

I'm working with the multiprocessing module in Python (2.7.3) and want to debug some stuff going on in my workers. However, I seem to not be able to catch any exceptions in the worker threads.

A minimal example:

import multiprocessing as mp


a=[1]
def worker():
    print a[2]


def pool():
    pool = mp.Pool(processes=1)
    pool.apply_async(worker, args = ())
    pool.close()
    pool.join()
    print "Multiprocessing done!"


if __name__ == '__main__':
    pool()

This is expected to raise an IndexError, but my output only is

    Multiprocessing done!

Is there a way to show me all exceptions occuring in the worker threads without manually raising my own?

like image 680
Dschoni Avatar asked Feb 28 '14 12:02

Dschoni


People also ask

How use lock in multiprocessing?

Python provides a mutual exclusion lock for use with processes via the multiprocessing. Lock class. An instance of the lock can be created and then acquired by processes before accessing a critical section, and released after the critical section. Only one process can have the lock at any time.

What is forking in multiprocessing?

When a process is forked the child process inherits all the same variables in the same state as they were in the parent. Each child process then continues independently from the forking point. The pool divides the args between the children and they work though them sequentially.

Is multiprocessing queue process safe?

Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.

How does Python handle multiprocessing?

The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine.


2 Answers

The error is not raised unless you call get method of AsyncResult (the return value of the apply_async):

According to the AsyncResult.get documentation:

Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().

def pool():
    pool = mp.Pool(processes=1)
    result = pool.apply_async(worker, args=())
    result.get() # <------------
    pool.close()
    pool.join()
    print "Multiprocessing done!"
like image 163
falsetru Avatar answered Oct 19 '22 22:10

falsetru


I think falsetru gave you what you need. I'd just like to expand a little more.

If it's important for you to get not only the error but the original context (i.e. to know that the exception occurred on line 1 of worker()) then you can check this nice post by Ned Batchelder which explains how to reraise exceptions with their original context.

That doesn't work for mp.Pool so it's just in case you need something more. This SO Question covers your question using more explicit multiprocessing techniques instead of mp.Pool.

like image 24
KobeJohn Avatar answered Oct 19 '22 23:10

KobeJohn