Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python multiprocessing queue error

I have this python code to read a file, do some processing and write the results in parallel:

def line_chunker(path):
    """
    Reads a file in chunks and yields each chunk.
    Each chunk is guaranteed to end at a carriage return (EOL).
    Each chunk is returned a single string.

    The number of chunks the file is split into is equal to the number of CPU cores
    available
    """
    size = os.path.getsize(path)
    cores = mp.cpu_count()
    chunksize = size/cores # gives truncated integer

    f = open(path)
    s = f.readline() # skip header
    while True:
        part = f.readlines(chunksize)
        if not part:
            f.close()
            break
        else:
            yield "".join(part)
    f.close()

def _validate(chunk, outq):
    """ Performs format validation on a given chunk of a csv file """
    rows = csv.reader(StringIO(chunk))
    vld = validation.RowValidator(rows)
    vld.check_rows()
    outq.put(vld.errors)

def _write(outq):
    """Writes lines in the outq to a text file """
    outfile = open("C:/testoutput.txt", "w")
    while True:
        result = outq.get()
        if result is None:
            outfile.close()
            break
        else:
            for line in result:
                outfile.write(line)
                outfile.write("\n")

def validate_pll(path):    
    """ Perform validation in parallel """

    pool = mp.Pool()
    outq = mp.Manager().Queue(maxsize = 8)

    writer = mp.Process(target = _write, args = (outq,))
    writer.start()
    for chunk in line_chunker(path):
        pool.apply_async(_validate, (chunk, outq))

    pool.close()
    pool.join()

It reads the file in chunks and for each chunk starts a process to do the processing. The results of the processing are put on a queue, which is watched by another process.

The code runs, but after completion I get an odd EOFError.

I suspect it is because I do not call writer.join(), but if I add this line, like so:

def validate_pll(path):    
    """ Perform validation in parallel """

    pool = mp.Pool()
    outq = mp.Manager().Queue(maxsize = 8)

    writer = mp.Process(target = _write, args = (outq,))
    writer.start()
    for chunk in line_chunker(path):
        pool.apply_async(_validate, (chunk, outq))

    pool.close()
    pool.join()
    writer.join()

The code simply hangs. Any idea what I am doing wrong?

The error message given is:

Process Process-10:
Traceback (most recent call last):
    File C\Anaconda\lib\multiprocessing\process.py, line 258, in _bootstrap
        self.run()
    File C\Anaconda\lib\multiprocessing\process.py line 114, in run
       self._target(*self._args, **self._kwargs)
    File C:\SVN\PortfolioInspector\trunk\parallel.py, line 114 in _write
       result = outq.get()
    File "(string)", line 2, in get
    File C\Anaconda\lib\multiprocessing\managers.py, line 759, in _callmethod
        kind, result = conn.recv()
EOFError
like image 832
jramm Avatar asked Feb 11 '23 23:02

jramm


1 Answers

The _writer process is still waiting for entries to be written to outq when the main process ends. It waits for entries by opening a blocking connection to the Manager process that manages the shared Queue. Now, at the point that the main process completes its execution, the Manager process shuts down, which sends the EOF to the connection that _writer opened, and you see that exception.

To fix it, you need to tell _writer to shut down prior to the main process ending (and by extension, the Manager process shutting down) . You actually already have a mechanism in place for this, you're just not using it; send a None to outq, and _writer will do an orderly shutdown. Call that prior to writer.join(), and things should work fine.

like image 51
dano Avatar answered Feb 14 '23 12:02

dano