Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing and handling exceptions in workers

I use python multiprocessing library for an algorithm in which I have many workers processing certain data and returning result to the parent process. I use multiprocessing.Queue for passing jobs to workers, and second to collect results.

It all works pretty well, until worker fails to process some chunk of data. In the simplified example below each worker has two phases:

  • initialization - can fail, in this case worker should be destroyed
  • data processing - processing a chunk of data can fail, in this case worker should skip this chunk and continue with next data.

When either of this phases fails I get a deadlock after script completion. This code simulates my problem:

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
        finally:
            jobs_queue.task_done()
    # Telling that we are done with processing stop token
    jobs_queue.task_done()



#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
    result = results.get()
    print result

#Signal all workers to finish
for i in range(workers_count):
    jobs.put(None)

#Wait for them to finish
jobs.join()

I have two question about this code:

  1. When init() fails, how to detect that worker is invalid and not to wait for it to finish?
  2. When do_work() fails, how to notify parent process that less results should be expected in the results queue?

Thank you for help!

like image 415
Peter Popov Avatar asked Jun 05 '13 15:06

Peter Popov


People also ask

How does Python handle exceptions in multiprocessing?

Exception Handling in Worker Initialization This can be set via the “initializer” argument to specify the function name and “initargs” to specify a tuple of arguments to the function. Each process started by the process pool will call your initialization function before starting the process.

How does Python multiprocess work?

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

Does Python multiprocessing use shared memory?

multiprocessing is a drop in replacement for Python's multiprocessing module. It supports the exact same operations, but extends it, so that all tensors sent through a multiprocessing. Queue , will have their data moved into shared memory and will only send a handle to another process.

How do you pass multiple arguments in multiprocessing Python?

Use Pool. The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.


1 Answers

I changed your code slightly to make it work (see explanation below).

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.5
fail_job_p = 0.4


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        result_queue.put('init failed')
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
            result_queue.put('job failed')


#========= Parent =========
jobs = mp.Queue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

init_failures = 0
job_failures = 0
successes = 0
while job_failures + successes < 30 and init_failures < workers_count:
    result = results.get()
    init_failures += int(result == 'init failed')
    job_failures += int(result == 'job failed')
    successes += int(result != 'init failed' and result != 'job failed')
    #print init_failures, job_failures, successes

for ii in range(workers_count):
    jobs.put(None)

My changes:

  1. Changed jobs to be just a normal Queue (instead of JoinableQueue).
  2. Workers now communicate back special results strings "init failed" and "job failed".
  3. The master process monitors for the said special results so long as specific conditions are in effect.
  4. In the end, put "stop" requests (i.e. None jobs) for however many workers you have, regardless. Note that not all of these may be pulled from the queue (in case the worker failed to initalize).

By the way, your original code was nice and easy to work with. The random probabilities bit is pretty cool.

like image 86
Velimir Mlaker Avatar answered Oct 16 '22 01:10

Velimir Mlaker