Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiprocessing Pause-Restart functionality using `event`

I am using a code posted below to enable pause-restart functionality for multiprocessing Pool.

I would appreciate if you explain me why event variable has to be sent as an argument to setup() function. Why then a global variable unpaused is declared inside of the scope of setup() function and then it is set to be the same as event variable:

def setup(event):
    global unpaused
    unpaused = event

I also would like to know a logistic behind of the following declaration:

pool=mp.Pool(2, setup, (event,))

The first argument submitted is the number of the CPU cores to be used by Pool. The second argument submitted is a function setup() which is mentioned above.

Why wouldn't it all be accomplished like:

global event
event=mp.Event()
pool = mp.Pool(processes=2)

And every time we need to pause or to restart a job we would just use:

To pause:

event.clear()

To restart:

event.set()

Why would we need a global variable unpaused? I don't get it! Please advise.


import time
import multiprocessing as mp

def myFunct(arg):
    proc=mp.current_process()
    print 'starting:', proc.name, proc.pid,'...\n'
    for i in range(110):
        for n in range(500000):
            pass
    print '\t ...', proc.name, proc.pid, 'completed\n'

def setup(event):
    global unpaused
    unpaused = event

def pauseJob():
    event.clear()

def continueJob():
    event.set()


event=mp.Event()

pool=mp.Pool(2, setup, (event,))
pool.map_async(myFunct, [1,2,3])

event.set()

pool.close()
pool.join()
like image 201
alphanumeric Avatar asked Jun 07 '14 18:06

alphanumeric


1 Answers

You're misunderstanding how Event works. But first, I'll cover what setup is doing.

The setup function is executed in each child process inside the pool as soon as it is started. So, you're setting a global variable called event inside each process to be the the same multiprocessing.Event object you created in your main process. You end up with each sub-process having a global variable called event that's reference to the same multiprocessing.Event object. This will allow you to signal your child processes from the main process, just like you want. See this example:

import multiprocessing

event = None
def my_setup(event_):
  global event
  event = event_
  print "event is %s in child" % event


if __name__ == "__main__":
    event = multiprocessing.Event()
    p = multiprocessing.Pool(2, my_setup, (event,))
    print "event is %s in parent" % event
    p.close()
    p.join()

Output:

dan@dantop2:~$ ./mult.py 
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in parent

As you can see, it's the same event in the two child processes as well as the parent. Just like you want.

However, passing event to setup actually isn't necessary. You can just inherit the event instance from the parent process:

import multiprocessing

event = None

def my_worker(num):
    print "event is %s in child" % event

if __name__ == "__main__":
    event = multiprocessing.Event()
    pool = multiprocessing.Pool(2)
    pool.map_async(my_worker, [i for i in range(pool._processes)]) # Just call my_worker for every process in the pool.

    pool.close()
    pool.join()
    print "event is %s in parent" % event

Output:

dan@dantop2:~$ ./mult.py 
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in parent

This is a lot simpler, and is the preferred way to pass a semaphore between parent and child. In fact, if you were to try to pass the event directly to a worker function, you'd get an error:

RuntimeError: Semaphore objects should only be shared between processes through inheritance

Now, back to how you're misunderstanding the way Event works. Event is meant to be used like this:

import time
import multiprocessing

def event_func(num):
    print '\t%r is waiting' % multiprocessing.current_process()
    event.wait()
    print '\t%r has woken up' % multiprocessing.current_process()

if __name__ == "__main__":
    event = multiprocessing.Event()

    pool = multiprocessing.Pool()
    a = pool.map_async(event_func, [i for i in range(pool._processes)])

    print 'main is sleeping'
    time.sleep(2)

    print 'main is setting event'
    event.set()

    pool.close()
    pool.join()

Output:

main is sleeping
    <Process(PoolWorker-1, started daemon)> is waiting
    <Process(PoolWorker-2, started daemon)> is waiting
    <Process(PoolWorker-4, started daemon)> is waiting
    <Process(PoolWorker-3, started daemon)> is waiting
main is setting event
    <Process(PoolWorker-2, started daemon)> has woken up
    <Process(PoolWorker-1, started daemon)> has woken up
    <Process(PoolWorker-4, started daemon)> has woken up
    <Process(PoolWorker-3, started daemon)> has woken up

As you can see, the child processes need to explicitly call event.wait() for them to be paused. They get unpaused when event.set is called in the main process. Right now none of your workers are calling event.wait, so none of them can ever be paused. I suggest you take a look at the docs for threading.Event, which multiprocessing.Event replicates.

like image 123
dano Avatar answered Nov 18 '22 22:11

dano