I frequently find myself writing programs in Python that construct a large (megabytes) read-only data structure and then use that data structure to analyze a very large (hundreds of megabytes in total) list of small records. Each of the records can be analyzed in parallel, so a natural pattern is to set up the read-only data structure and assign it to a global variable, then create a multiprocessing.Pool
(which implicitly copies the data structure into each worker process, via fork
) and then use imap_unordered
to crunch the records in parallel. The skeleton of this pattern tends to look like this:
classifier = None
def classify_row(row):
return classifier.classify(row)
def classify(classifier_spec, data_file):
global classifier
try:
classifier = Classifier(classifier_spec)
with open(data_file, "rt") as fp, \
multiprocessing.Pool() as pool:
rd = csv.DictReader(fp)
yield from pool.imap_unordered(classify_row, rd)
finally:
classifier = None
I'm not happy with this because of the global variable and the implicit coupling between classify
and classify_row
. Ideally, I would like to write
def classify(classifier_spec, data_file):
classifier = Classifier(classifier_spec)
with open(data_file, "rt") as fp, \
multiprocessing.Pool() as pool:
rd = csv.DictReader(fp)
yield from pool.imap_unordered(classifier.classify, rd)
but this does not work, because the Classifier object usually contains objects which cannot be pickled (because they are defined by extension modules whose authors didn't care about that); I have also read that it would be really slow if it did work, because the Classifier object would get copied into the worker processes on every invocation of the bound method.
Is there a better alternative? I only care about 3.x.
You can share a global variable with all child workers processes in the multiprocessing pool by defining it in the worker process initialization function. In this tutorial you will discover how to share global variables with all workers in the Python process pool.
No, since global variables are not shared between processes unless some IPC mechanism is implemented. The memory space will be copied. As a consequence, the global variable in both processes will have the same value inmediately after fork, but if one changes it, the other wont see it changed.
Use Pool.The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.
Pool is generally used for heterogeneous tasks, whereas multiprocessing. Process is generally used for homogeneous tasks. The Pool is designed to execute heterogeneous tasks, that is tasks that do not resemble each other. For example, each task submitted to the process pool may be a different target function.
If you want to use forking, I don't see a way around using a global. But I also don't see a reason why you would have to feel bad about using a global in this case, you're not manipulating a global list with multi-threading or so.
It's possible to cope with the ugliness in your example, though. You want to pass classifier.classify
directly, but the Classifier
object contains objects which cannot be pickled.
import os
import csv
import uuid
from threading import Lock
from multiprocessing import Pool
from weakref import WeakValueDictionary
class Classifier:
def __init__(self, spec):
self.lock = Lock() # unpickleable
self.spec = spec
def classify(self, row):
return f'classified by pid: {os.getpid()} with spec: {self.spec}', row
I suggest we subclass Classifier
and define __getstate__
and __setstate__
to enable pickling. Since you're using forking anyway, all state it has to pickle, is information how to get a reference to a forked global instance. Then we'll just update the pickled object's __dict__
with the __dict__
of the forked instance (which hasn't gone through the reduction of pickling) and your instance is complete again.
To achieve this without additional boilerplate, the subclassed Classifier
instance has to generate a name for itself and register this as a global variable. This first reference, will be a weak reference, so the instance can be garbage collected when the user expects it. The second reference is created by the user when he assigns classifier = Classifier(classifier_spec)
. This one, doesn't have to be global.
The generated name in the example below is generated with help of standard-lib's uuid
module. An uuid is converted to a string and edited into a valid identifier (it wouldn't have to be, but it's convenient for debugging in interactive mode).
class SubClassifier(Classifier):
def __init__(self, spec):
super().__init__(spec)
self.uuid = self._generate_uuid_string()
self.pid = os.getpid()
self._register_global()
def __getstate__(self):
"""Define pickled content."""
return {'uuid': self.uuid}
def __setstate__(self, state):
"""Set state in child process."""
self.__dict__ = state
self.__dict__.update(self._get_instance().__dict__)
def _get_instance(self):
"""Get reference to instance."""
return globals()[self.uuid][self.uuid]
@staticmethod
def _generate_uuid_string():
"""Generate id as valid identifier."""
# return 'uuid_' + '123' # for testing
return 'uuid_' + str(uuid.uuid4()).replace('-', '_')
def _register_global(self):
"""Register global reference to instance."""
weakd = WeakValueDictionary({self.uuid: self})
globals().update({self.uuid: weakd})
def __del__(self):
"""Clean up globals when deleted in parent."""
if os.getpid() == self.pid:
globals().pop(self.uuid)
The sweet thing here is, the boilerplate is totally gone. You don't have to mess manually with declaring and deleting globals since the instance manages everything itself in background:
def classify(classifier_spec, data_file, n_workers):
classifier = SubClassifier(classifier_spec)
# assert globals()['uuid_123']['uuid_123'] # for testing
with open(data_file, "rt") as fh, Pool(n_workers) as pool:
rd = csv.DictReader(fh)
yield from pool.imap_unordered(classifier.classify, rd)
if __name__ == '__main__':
PATHFILE = 'data.csv'
N_WORKERS = 4
g = classify(classifier_spec='spec1', data_file=PATHFILE, n_workers=N_WORKERS)
for record in g:
print(record)
# assert 'uuid_123' not in globals() # no reference left
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With