I know there are many post on Stack Exchange related to writing results from multiprocessing to single file and I have developed my code after reading only those posts. What I am trying to achieve is that run 'RevMapCoord' function in parallel and write its result in one single file using multiprocess.queue. But I am having problem while queuing my job. My Code:
def RevMapCoord(list):
"Read a file, Find String and Do something"
def feed(queue, parlist):
for par in parlist:
print ('Echo from Feeder: %s' % (par))
queue.put(par)
print ('**Feeder finished queing**')
def calc(queueIn, queueOut):
print ('Worker function started')
while True:
try:
par = queueIn.get(block = False)
res = RevMapCoord(final_res)
queueOut.put((par,res))
except:
break
def write(queue, fname):
fhandle = open(fname, "w")
while True:
try:
par, res = queue.get(block = False)
print >>fhandle, par, res
except:
break
fhandle.close()
feedProc = Process(target = feed , args = (workerQueue, final_res))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nproc)]
writProc = Process(target = write, args = (writerQueue, sco_inp_extend_geno))
feedProc.start()
print ('Feeder is joining')
feedProc.join ()
for p in calcProc:
p.start()
for p in calcProc:
p.join()
writProc.start()
writProc.join ()
When I run this code script stucks at "feedProc.start()" step. The last few output lines from screen shows print statement from the end of "feedProc.start()":
Echo from Feeder: >AK779,AT61680,50948-50968,50959,6,0.406808,Ashley,Dayne
Echo from Feeder: >AK832,AT30210,1091-1111,1102,7,0.178616,John,Caine
**Feeder finished queing**
But hangs before executing next line "feedProc.join ()". Code gives no error and keep on running but doing nothing(hangs). Please tell me what mistake I am making.
The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added. The first items added to the queue will be the first items retrieved. This is opposed to other queue types such as last-in, first-out and priority queues.
This includes queues in the multiprocessing.Queues are thread and process safe. This means that processes may get() and put() items from and to the queue concurrently without fear of a race condition. You can learn more about to how to use queues with multiple processes in the tutorial: Multiprocessing Queue in Python.
Use Pool. The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.
Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.
I think you should slim your example to the basics. For example:
from multiprocessing import Process, Queue
def f(q):
q.put('Hello')
q.put('Bye')
q.put(None)
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
with open('file.txt', 'w') as fp:
while True:
item = q.get()
print(item)
if item is None:
break
fp.write(item)
p.join()
Here I have two process (the main process, a p). p puts strings in a queue which are retrieved by the main process. When the main process finds None (a sentinel that I am using to indicate: "I am done" it breaks the loop.
Extending this to many process (or threads) is trivial.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With