Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Avoid global variables for unpicklable shared state among multiprocessing.Pool workers

I frequently find myself writing programs in Python that construct a large (megabytes) read-only data structure and then use that data structure to analyze a very large (hundreds of megabytes in total) list of small records. Each of the records can be analyzed in parallel, so a natural pattern is to set up the read-only data structure and assign it to a global variable, then create a multiprocessing.Pool (which implicitly copies the data structure into each worker process, via fork) and then use imap_unordered to crunch the records in parallel. The skeleton of this pattern tends to look like this:

classifier = None
def classify_row(row):
    return classifier.classify(row)

def classify(classifier_spec, data_file):
    global classifier
    try:
        classifier = Classifier(classifier_spec)
        with open(data_file, "rt") as fp, \
             multiprocessing.Pool() as pool:
            rd = csv.DictReader(fp)
            yield from pool.imap_unordered(classify_row, rd)
    finally:
        classifier = None

I'm not happy with this because of the global variable and the implicit coupling between classify and classify_row. Ideally, I would like to write

def classify(classifier_spec, data_file):
    classifier = Classifier(classifier_spec)
    with open(data_file, "rt") as fp, \
         multiprocessing.Pool() as pool:
        rd = csv.DictReader(fp)
        yield from pool.imap_unordered(classifier.classify, rd)

but this does not work, because the Classifier object usually contains objects which cannot be pickled (because they are defined by extension modules whose authors didn't care about that); I have also read that it would be really slow if it did work, because the Classifier object would get copied into the worker processes on every invocation of the bound method.

Is there a better alternative? I only care about 3.x.

like image 257
zwol Avatar asked Oct 02 '18 01:10

zwol


People also ask

Can multiprocessing access global variables?

You can share a global variable with all child workers processes in the multiprocessing pool by defining it in the worker process initialization function. In this tutorial you will discover how to share global variables with all workers in the Python process pool.

Do forked processes share global variables?

No, since global variables are not shared between processes unless some IPC mechanism is implemented. The memory space will be copied. As a consequence, the global variable in both processes will have the same value inmediately after fork, but if one changes it, the other wont see it changed.

How do you pass multiple arguments in a multiprocessing pool?

Use Pool.The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.

How do processes pools work in multiprocessing?

Pool is generally used for heterogeneous tasks, whereas multiprocessing. Process is generally used for homogeneous tasks. The Pool is designed to execute heterogeneous tasks, that is tasks that do not resemble each other. For example, each task submitted to the process pool may be a different target function.


1 Answers

If you want to use forking, I don't see a way around using a global. But I also don't see a reason why you would have to feel bad about using a global in this case, you're not manipulating a global list with multi-threading or so.

It's possible to cope with the ugliness in your example, though. You want to pass classifier.classify directly, but the Classifier object contains objects which cannot be pickled.

import os
import csv
import uuid
from threading import Lock
from multiprocessing import Pool
from weakref import WeakValueDictionary

class Classifier:

    def __init__(self, spec):
        self.lock = Lock()  # unpickleable
        self.spec = spec

    def classify(self, row):
        return f'classified by pid: {os.getpid()} with spec: {self.spec}', row

I suggest we subclass Classifier and define __getstate__ and __setstate__ to enable pickling. Since you're using forking anyway, all state it has to pickle, is information how to get a reference to a forked global instance. Then we'll just update the pickled object's __dict__ with the __dict__ of the forked instance (which hasn't gone through the reduction of pickling) and your instance is complete again.

To achieve this without additional boilerplate, the subclassed Classifier instance has to generate a name for itself and register this as a global variable. This first reference, will be a weak reference, so the instance can be garbage collected when the user expects it. The second reference is created by the user when he assigns classifier = Classifier(classifier_spec). This one, doesn't have to be global.

The generated name in the example below is generated with help of standard-lib's uuid module. An uuid is converted to a string and edited into a valid identifier (it wouldn't have to be, but it's convenient for debugging in interactive mode).

class SubClassifier(Classifier):

    def __init__(self, spec):
        super().__init__(spec)
        self.uuid = self._generate_uuid_string()
        self.pid = os.getpid()
        self._register_global()

    def __getstate__(self):
        """Define pickled content."""
        return {'uuid': self.uuid}

    def __setstate__(self, state):
        """Set state in child process."""
        self.__dict__ = state
        self.__dict__.update(self._get_instance().__dict__)

    def _get_instance(self):
        """Get reference to instance."""
        return globals()[self.uuid][self.uuid]

    @staticmethod
    def _generate_uuid_string():
        """Generate id as valid identifier."""
        # return 'uuid_' + '123' # for testing
        return 'uuid_' + str(uuid.uuid4()).replace('-', '_')

    def _register_global(self):
        """Register global reference to instance."""
        weakd = WeakValueDictionary({self.uuid: self})
        globals().update({self.uuid: weakd})

    def __del__(self):
        """Clean up globals when deleted in parent."""
        if os.getpid() == self.pid:
            globals().pop(self.uuid)

The sweet thing here is, the boilerplate is totally gone. You don't have to mess manually with declaring and deleting globals since the instance manages everything itself in background:

def classify(classifier_spec, data_file, n_workers):
    classifier = SubClassifier(classifier_spec)
    # assert globals()['uuid_123']['uuid_123'] # for testing
    with open(data_file, "rt") as fh, Pool(n_workers) as pool:
        rd = csv.DictReader(fh)
        yield from pool.imap_unordered(classifier.classify, rd)


if __name__ == '__main__':

    PATHFILE = 'data.csv'
    N_WORKERS = 4

    g = classify(classifier_spec='spec1', data_file=PATHFILE, n_workers=N_WORKERS)
    for record in g:
        print(record)

   # assert 'uuid_123' not in globals() # no reference left
like image 141
Darkonaut Avatar answered Oct 08 '22 20:10

Darkonaut