I use python multiprocessing library for an algorithm in which I have many workers processing certain data and returning result to the parent process. I use multiprocessing.Queue for passing jobs to workers, and second to collect results.
It all works pretty well, until worker fails to process some chunk of data. In the simplified example below each worker has two phases:
When either of this phases fails I get a deadlock after script completion. This code simulates my problem:
import multiprocessing as mp
import random
workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3
#========= Worker =========
def do_work(job_state, arg):
if random.random() < fail_job_p:
raise Exception("Job failed")
return "job %d processed %d" % (job_state, arg)
def init(args):
if random.random() < fail_init_p:
raise Exception("Worker init failed")
return args
def worker_function(args, jobs_queue, result_queue):
# INIT
# What to do when init() fails?
try:
state = init(args)
except:
print "!Worker %d init fail" % args
return
# DO WORK
# Process data in the jobs queue
for job in iter(jobs_queue.get, None):
try:
# Can throw an exception!
result = do_work(state, job)
result_queue.put(result)
except:
print "!Job %d failed, skip..." % job
finally:
jobs_queue.task_done()
# Telling that we are done with processing stop token
jobs_queue.task_done()
#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
mp.Process(target=worker_function, args=(i, jobs, results)).start()
# Populate jobs queue
results_to_expect = 0
for j in range(30):
jobs.put(j)
results_to_expect += 1
# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
result = results.get()
print result
#Signal all workers to finish
for i in range(workers_count):
jobs.put(None)
#Wait for them to finish
jobs.join()
I have two question about this code:
init()
fails, how to detect that worker is invalid and not to wait for it to finish?do_work()
fails, how to notify parent process that less results should be expected in the results queue? Thank you for help!
Exception Handling in Worker Initialization This can be set via the “initializer” argument to specify the function name and “initargs” to specify a tuple of arguments to the function. Each process started by the process pool will call your initialization function before starting the process.
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
multiprocessing is a drop in replacement for Python's multiprocessing module. It supports the exact same operations, but extends it, so that all tensors sent through a multiprocessing. Queue , will have their data moved into shared memory and will only send a handle to another process.
Use Pool. The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.
I changed your code slightly to make it work (see explanation below).
import multiprocessing as mp
import random
workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.5
fail_job_p = 0.4
#========= Worker =========
def do_work(job_state, arg):
if random.random() < fail_job_p:
raise Exception("Job failed")
return "job %d processed %d" % (job_state, arg)
def init(args):
if random.random() < fail_init_p:
raise Exception("Worker init failed")
return args
def worker_function(args, jobs_queue, result_queue):
# INIT
# What to do when init() fails?
try:
state = init(args)
except:
print "!Worker %d init fail" % args
result_queue.put('init failed')
return
# DO WORK
# Process data in the jobs queue
for job in iter(jobs_queue.get, None):
try:
# Can throw an exception!
result = do_work(state, job)
result_queue.put(result)
except:
print "!Job %d failed, skip..." % job
result_queue.put('job failed')
#========= Parent =========
jobs = mp.Queue()
results = mp.Queue()
for i in range(workers_count):
mp.Process(target=worker_function, args=(i, jobs, results)).start()
# Populate jobs queue
results_to_expect = 0
for j in range(30):
jobs.put(j)
results_to_expect += 1
init_failures = 0
job_failures = 0
successes = 0
while job_failures + successes < 30 and init_failures < workers_count:
result = results.get()
init_failures += int(result == 'init failed')
job_failures += int(result == 'job failed')
successes += int(result != 'init failed' and result != 'job failed')
#print init_failures, job_failures, successes
for ii in range(workers_count):
jobs.put(None)
My changes:
jobs
to be just a normal Queue
(instead of JoinableQueue
).None
jobs) for however many workers you have, regardless. Note that not all of these may be pulled from the queue (in case the worker failed to initalize).By the way, your original code was nice and easy to work with. The random probabilities bit is pretty cool.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With