I am using a code posted below to enable pause-restart functionality for multiprocessing
Pool.
I would appreciate if you explain me why event
variable has to be sent as an argument to setup()
function. Why then a global variable unpaused
is declared inside of the scope of setup()
function and then it is set to be the same as event
variable:
def setup(event):
global unpaused
unpaused = event
I also would like to know a logistic behind of the following declaration:
pool=mp.Pool(2, setup, (event,))
The first argument submitted is the number of the CPU cores to be used by Pool.
The second argument submitted is a function setup()
which is mentioned above.
Why wouldn't it all be accomplished like:
global event
event=mp.Event()
pool = mp.Pool(processes=2)
And every time we need to pause or to restart a job we would just use:
To pause:
event.clear()
To restart:
event.set()
Why would we need a global variable unpaused
? I don't get it! Please advise.
import time
import multiprocessing as mp
def myFunct(arg):
proc=mp.current_process()
print 'starting:', proc.name, proc.pid,'...\n'
for i in range(110):
for n in range(500000):
pass
print '\t ...', proc.name, proc.pid, 'completed\n'
def setup(event):
global unpaused
unpaused = event
def pauseJob():
event.clear()
def continueJob():
event.set()
event=mp.Event()
pool=mp.Pool(2, setup, (event,))
pool.map_async(myFunct, [1,2,3])
event.set()
pool.close()
pool.join()
You're misunderstanding how Event
works. But first, I'll cover what setup
is doing.
The setup
function is executed in each child process inside the pool as soon as it is started. So, you're setting a global variable called event
inside each process to be the the same multiprocessing.Event
object you created in your main process. You end up with each sub-process having a global variable called event
that's reference to the same multiprocessing.Event
object. This will allow you to signal your child processes from the main process, just like you want. See this example:
import multiprocessing
event = None
def my_setup(event_):
global event
event = event_
print "event is %s in child" % event
if __name__ == "__main__":
event = multiprocessing.Event()
p = multiprocessing.Pool(2, my_setup, (event,))
print "event is %s in parent" % event
p.close()
p.join()
Output:
dan@dantop2:~$ ./mult.py
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in parent
As you can see, it's the same event
in the two child processes as well as the parent. Just like you want.
However, passing event
to setup actually isn't necessary. You can just inherit the event
instance from the parent process:
import multiprocessing
event = None
def my_worker(num):
print "event is %s in child" % event
if __name__ == "__main__":
event = multiprocessing.Event()
pool = multiprocessing.Pool(2)
pool.map_async(my_worker, [i for i in range(pool._processes)]) # Just call my_worker for every process in the pool.
pool.close()
pool.join()
print "event is %s in parent" % event
Output:
dan@dantop2:~$ ./mult.py
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in parent
This is a lot simpler, and is the preferred way to pass a semaphore between parent and child. In fact, if you were to try to pass the event
directly to a worker function, you'd get an error:
RuntimeError: Semaphore objects should only be shared between processes through inheritance
Now, back to how you're misunderstanding the way Event
works. Event
is meant to be used like this:
import time
import multiprocessing
def event_func(num):
print '\t%r is waiting' % multiprocessing.current_process()
event.wait()
print '\t%r has woken up' % multiprocessing.current_process()
if __name__ == "__main__":
event = multiprocessing.Event()
pool = multiprocessing.Pool()
a = pool.map_async(event_func, [i for i in range(pool._processes)])
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
pool.close()
pool.join()
Output:
main is sleeping
<Process(PoolWorker-1, started daemon)> is waiting
<Process(PoolWorker-2, started daemon)> is waiting
<Process(PoolWorker-4, started daemon)> is waiting
<Process(PoolWorker-3, started daemon)> is waiting
main is setting event
<Process(PoolWorker-2, started daemon)> has woken up
<Process(PoolWorker-1, started daemon)> has woken up
<Process(PoolWorker-4, started daemon)> has woken up
<Process(PoolWorker-3, started daemon)> has woken up
As you can see, the child processes need to explicitly call event.wait()
for them to be paused. They get unpaused when event.set
is called in the main process. Right now none of your workers are calling event.wait
, so none of them can ever be paused. I suggest you take a look at the docs for threading.Event
, which multiprocessing.Event
replicates.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With