Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

"Can't pickle <type '_csv.reader'>" error when using multiprocessing on Windows

Tags:

python

I'm writing a multiprocessing program to handle a large .CSV file in parallel, using Windows.

I found this excellent example for a similar problem. When running it under Windows, I receive an error that csv.reader is not Picklable.

I suppose I can open the CSV file in the reader sub-process and just send the file name to it from the parent process. However, I would like to pass an already opened CSV file (like the code is supposed to do), with a specific state, that is, really use a shared object.

Any idea how to do that under Windows or what's missing there?

This is the code (I'm reposting for ease of reading):

"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser

class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()

        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]

        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()

        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1

        self.pout.join()
        self.infile.close()

    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.

            The index is zero-index based.

            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input thread sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )

            for i in range(self.numprocs):
                self.inq.put("STOP")

    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")

    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.

        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across threads so open/close
        # and use it all in the same thread or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)

        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1

        outfile.close()

def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")

    c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
    main(sys.argv[1:])

When running under windows, this is the error I receive:

Traceback (most recent call last):
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 130, in <module>
    main(sys.argv[1:])
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 127, in main
    c = CSVWorker(opts.numprocs, args[0], args[1])
  File "C:\Users\ron.berman\Documents\Attribution\ubrShapley\test.py", line 44, in __init__
    self.pin.start()
  File "C:\Python27\lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "C:\Python27\lib\multiprocessing\forking.py", line 271, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Python27\lib\multiprocessing\forking.py", line 193, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Python27\lib\pickle.py", line 224, in dump
    self.save(obj)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\multiprocessing\forking.py", line 66, in dispatcher
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 401, in save_reduce
    save(args)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 548, in save_tuple
    save(element)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 419, in save_reduce
    save(state)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "C:\Python27\lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Python27\lib\pickle.py", line 396, in save_reduce
    save(cls)
  File "C:\Python27\lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Python27\lib\pickle.py", line 753, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <type '_csv.reader'>: it's not the same object as _csv.reader
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Python27\lib\multiprocessing\forking.py", line 374, in main
    self = load(from_parent)
  File "C:\Python27\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Python27\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Python27\lib\pickle.py", line 880, in load_eof
    raise EOFError
EOFError
like image 733
Ron Avatar asked Dec 15 '11 02:12

Ron


2 Answers

The problem you're running into is caused by using methods of the CSVWorker class as the process targets; and that class has members that cannot be pickled; those open files are just never going to work;

What you want to do is break that class into two classes; one which coordinates all of the worker subprocesses, and another which actually does the computational work. the worker processes take filenames as arguments and open the individual files as needed, or at least wait until they have their worker methods invoked and open files only then. they can also take multiprocessing.Queues as arguments or as instance members; that's safe to pass around.

To a certain extent, you already kinda do this; your write_output_csv method is opening the file its file in the subprocess, but your parse_input_csv method is expecting to find an already open and prepared file as a attribute of self. Do it the other way consistently and you should be in good shape.

like image 133
SingleNegationElimination Avatar answered Nov 27 '22 06:11

SingleNegationElimination


Since multiprocessing depends on serializing and de-serializing objects when passing then as parameters between process, and your code relies on passing an instance of CSVWorker around the process (the instance denoted as 'self') you got this error - as both csv readers and open files can be pickled.

You mentioned your CSV are large, I don't think reading all data into a list would be a solution for you - so you have to think of a way of passing one line from your input CSV to each worker at once, and retrieving a processed line from each worker , and perform all the I/O on the main process.

It looks like multiprocessing.Pool will be a better way of writing your aplication - Check multiprocessing documentation at http://docs.python.org/library/multiprocessing.html - and try using a process pool, and pool.map to process your CSV's. It also takes care of preserving the order - which will elimnate a lot of the complicated logic on your code.

like image 21
jsbueno Avatar answered Nov 27 '22 06:11

jsbueno