Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing - Is it possible to introduce a fixed time delay between individual processes?

I have searched and cannot find an answer to this question elsewhere. Hopefully I haven't missed something.

I am trying to use Python multiprocessing to essentially batch run some proprietary models in parallel. I have, say, 200 simulations, and I want to batch run them ~10-20 at a time. My problem is that the proprietary software crashes if two models happen to start at the same / similar time. I need to introduce a delay between processes spawned by multiprocessing so that each new model run waits a little bit before starting.

So far, my solution has been to introduced a random time delay at the start of the child process before it fires off the model run. However, this only reduces the probability of any two runs starting at the same time, and therefore I still run into problems when trying to process a large number of models. I therefore think that the time delay needs to be built into the multiprocessing part of the code but I haven't been able to find any documentation or examples of this.

Edit: I am using Python 2.7

This is my code so far:

from time import sleep
import numpy as np
import subprocess
import multiprocessing

def runmodels(arg):
    sleep(np.random.rand(1,1)*120) # this is my interim solution to reduce the probability that any two runs start at the same time, but it isn't a guaranteed solution
    subprocess.call(arg) # this line actually fires off the model run

if __name__ == '__main__':    

    arguments =     [big list of runs in here
                    ]    

    count = 12
    pool = multiprocessing.Pool(processes = count)
    r = pool.imap_unordered(runmodels, arguments)      
    pool.close()
    pool.join()
like image 420
katamatsu Avatar asked May 20 '15 07:05

katamatsu


2 Answers

multiprocessing.Pool() already limits number of processes running concurrently.

You could use a lock, to separate the starting time of the processes (not tested):

import threading
import multiprocessing

def init(lock):
    global starting
    starting = lock

def run_model(arg):
    starting.acquire() # no other process can get it until it is released
    threading.Timer(1, starting.release).start() # release in a second
    # ... start your simulation here

if __name__=="__main__":
   arguments = ...
   pool = Pool(processes=12, 
               initializer=init, initargs=[multiprocessing.Lock()])
   for _ in pool.imap_unordered(run_model, arguments):
       pass
like image 178
jfs Avatar answered Nov 09 '22 01:11

jfs


One way to do this with thread and semaphore :

from time import sleep
import subprocess
import threading


def runmodels(arg):
    subprocess.call(arg)
    sGlobal.release() # release for next launch


if __name__ == '__main__':
    threads = []
    global sGlobal
    sGlobal = threading.Semaphore(12) #Semaphore for max 12 Thread
    arguments =  [big list of runs in here
                ]
    for arg in arguments :
        sGlobal.acquire() # Block if more than 12 thread
        t = threading.Thread(target=runmodels, args=(arg,))
        threads.append(t)
        t.start()
        sleep(1)

    for t in threads :
        t.join()
like image 33
Quentin Rouland Avatar answered Nov 09 '22 00:11

Quentin Rouland