Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Multiprocessing using Queue to write to same file

I know there are many post on Stack Exchange related to writing results from multiprocessing to single file and I have developed my code after reading only those posts. What I am trying to achieve is that run 'RevMapCoord' function in parallel and write its result in one single file using multiprocess.queue. But I am having problem while queuing my job. My Code:

def RevMapCoord(list):
    "Read a file, Find String and Do something"

def feed(queue, parlist):
    for par in parlist:
        print ('Echo from Feeder: %s' % (par))
        queue.put(par)
    print ('**Feeder finished queing**')

def calc(queueIn, queueOut):
     print ('Worker function started')
     while True:
         try:
             par = queueIn.get(block = False)
             res = RevMapCoord(final_res)
             queueOut.put((par,res))
         except:
             break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
         try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
         except:
            break
    fhandle.close()


feedProc = Process(target = feed , args = (workerQueue, final_res))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nproc)]
writProc = Process(target = write, args = (writerQueue, sco_inp_extend_geno))

feedProc.start()
print ('Feeder is joining')
feedProc.join ()
for p in calcProc:
    p.start()
for p in calcProc:
    p.join()
writProc.start()
writProc.join ()

When I run this code script stucks at "feedProc.start()" step. The last few output lines from screen shows print statement from the end of "feedProc.start()":

Echo from Feeder: >AK779,AT61680,50948-50968,50959,6,0.406808,Ashley,Dayne
Echo from Feeder: >AK832,AT30210,1091-1111,1102,7,0.178616,John,Caine
**Feeder finished queing**

But hangs before executing next line "feedProc.join ()". Code gives no error and keep on running but doing nothing(hangs). Please tell me what mistake I am making.

like image 447
Bade Avatar asked Mar 20 '13 17:03

Bade


People also ask

How does Python multiprocessing queue work?

The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added. The first items added to the queue will be the first items retrieved. This is opposed to other queue types such as last-in, first-out and priority queues.

Is multiprocessing queue thread safe?

This includes queues in the multiprocessing.Queues are thread and process safe. This means that processes may get() and put() items from and to the queue concurrently without fear of a race condition. You can learn more about to how to use queues with multiple processes in the tutorial: Multiprocessing Queue in Python.

How do you pass multiple arguments in multiprocessing Python?

Use Pool. The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.

Does Python multiprocessing use multiple cores?

Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.


1 Answers

I think you should slim your example to the basics. For example:

from multiprocessing import Process, Queue

def f(q):
    q.put('Hello')
    q.put('Bye')
    q.put(None)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    with open('file.txt', 'w') as fp:
        while True:
            item = q.get()
            print(item)
            if item is None:
                break
            fp.write(item)
    p.join()

Here I have two process (the main process, a p). p puts strings in a queue which are retrieved by the main process. When the main process finds None (a sentinel that I am using to indicate: "I am done" it breaks the loop.

Extending this to many process (or threads) is trivial.

like image 59
Hernan Avatar answered Oct 20 '22 20:10

Hernan