Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Writing to a file with multiprocessing

I'm having the following problem in python.

I need to do some calculations in parallel whose results I need to be written sequentially in a file. So I created a function that receives a multiprocessing.Queue and a file handle, do the calculation and print the result in the file:

import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation   

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file

def work(queue, fh):
while True:
    try:
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
    except:
        break


if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fh = open("foo", "w")
    workQueue = Queue()
    parList = # list of conditions for which I want to run doCalculation()
    for x in parList:
        workQueue.put(x)
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
    for p in processes:
       p.start()
    for p in processes:
       p.join()
    fh.close()

But the file ends up empty after the script runs. I tried to change the worker() function to:

def work(queue, filename):
while True:
    try:
        fh = open(filename, "a")
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
        fh.close()
    except:
        break

and pass the filename as parameter. Then it works as I intended. When I try to do the same thing sequentially, without multiprocessing, it also works normally.

Why it didn't worked in the first version? I can't see the problem.

Also: can I guarantee that two processes won't try to write the file simultaneously?


EDIT:

Thanks. I got it now. This is the working version:

import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform

def doCalculation(par):
    t = uniform(0,2)
    sleep(t)
    return par * par  # just to simulate some calculation

def feed(queue, parlist):
    for par in parlist:
            queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "dealing with ", par, "" 
            res = doCalculation(par)
            queueOut.put((par,res))
        except:
            break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
        try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
        except:
            break
    fhandle.close()

if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fname = "foo"
    workerQueue = Queue()
    writerQueue = Queue()
    parlist = [1,2,3,4,5,6,7,8,9,10]
    feedProc = Process(target = feed , args = (workerQueue, parlist))
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target = write, args = (writerQueue, fname))


    feedProc.start()
    for p in calcProc:
        p.start()
    writProc.start()

    feedProc.join ()
    for p in calcProc:
        p.join()
    writProc.join ()
like image 243
Rafael S. Calsaverini Avatar asked Jun 29 '11 17:06

Rafael S. Calsaverini


People also ask

How do you pass multiple arguments in a multiprocessing pool?

Use Pool.The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.

When would you use a multiprocessing pool?

Use the multiprocessing pool if your tasks are independent. This means that each task is not dependent on other tasks that could execute at the same time. It also may mean tasks that are not dependent on any data other than data provided via function arguments to the task.

Does multiprocessing work in airflow?

It uses the multiprocessing Python library and queues to parallelize the execution of tasks.

Does Numba work with multiprocessing?

You should not use the multiprocessing package in a Numba code. This will simply not work (Numba will use a fallback implementation which is the basic Python one).


2 Answers

You really should use two queues and three separate kinds of processing.

  1. Put stuff into Queue #1.

  2. Get stuff out of Queue #1 and do calculations, putting stuff in Queue #2. You can have many of these, since they get from one queue and put into another queue safely.

  3. Get stuff out of Queue #2 and write it to a file. You must have exactly 1 of these and no more. It "owns" the file, guarantees atomic access, and absolutely assures that the file is written cleanly and consistently.

like image 141
S.Lott Avatar answered Nov 02 '22 22:11

S.Lott


If anyone is looking for a simple way to do the same, this can help you. I don't think there are any disadvantages to doing it in this way. If there are, please let me know.

import multiprocessing 
import re

def mp_worker(item):
    # Do something
    return item, count

def mp_handler():
    cpus = multiprocessing.cpu_count()
    p = multiprocessing.Pool(cpus)
    # The below 2 lines populate the list. This listX will later be accessed parallely. This can be replaced as long as listX is passed on to the next step.
    with open('ExampleFile.txt') as f:
        listX = [line for line in (l.strip() for l in f) if line]
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, listX):
            # (item, count) tuples from worker
            f.write('%s: %d\n' % result)

if __name__=='__main__':
    mp_handler()

Source: Python: Writing to a single file with queue while using multiprocessing Pool

like image 23
Menezes Sousa Avatar answered Nov 02 '22 23:11

Menezes Sousa