os and python info:
uname -a
Linux debian 5.10.0-8-amd64 #1 SMP Debian 5.10.46-4 (2021-08-03) x86_64 GNU/Linux
python3 --version
Python 3.9.2
Here is a simple class which can start multiprocessing.
from multiprocessing.pool import Pool
class my_mp(object):
def __init__(self):
self.process_num = 3
fh = open('test.txt', 'w')
def run_task(self,i):
print('process {} start'.format(str(i)))
time.sleep(2)
print('process {} end'.format(str(i)))
def run(self):
pool = Pool(processes = self.process_num)
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()
Initialize the my_mp
class,then start multiprocess.
ins = my_mp()
ins.run()
process 0 start
process 1 start
process 2 start
process 0 end
process 2 end
process 1 end
Now replace fh = open('test.txt', 'w')
with self.fh = open('test.txt', 'w')
in my_mp
class and try again.
ins = my_mp()
ins.run()
No output!Why no process start?
>>> from multiprocessing.pool import Pool
>>>
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... fh = open('test.txt', 'w')
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
process 0 start
process 1 start
process 2 start
process 2 end
process 0 end
process 1 end
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... self.fh = open('test.txt', 'w')
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
>>> x.run()
>>> x = my_mp()
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... fh = open('test.txt', 'w')
... self.fh = fh
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
>>>
Why can't add file handler with the form of self.fh
in the __init__
method?I have never called the file handler defined in __init__
in any process.
Stdlib multiprocessing uses pickle to serialize objects. Anything which needs to be sent across the process boundary needs to be picklable.
Custom class instances are generally picklable, as long as all their attributes are picklable - it works by importing the type within the subprocess and unpickling the attributes.
The issue is that the object returned by open()
is not picklable.
>>> class A:
... pass
...
>>> import pickle
>>> pickle.dumps(A())
b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x01A\x94\x93\x94)\x81\x94.'
>>> class A:
... def __init__(self):
... self.fh = open("test.txt", "w")
...
>>> pickle.dumps(A())
TypeError: cannot pickle '_io.TextIOWrapper' object
In the first case, the multiprocessing pool still works because fh
is just a local variable and it's deleted as soon as it's out of scope, i.e. when the __init__
method returns. But as soon as you save this handle into the instance's namespace with self.fh = open(...)
, there will remain a reference and it will need to be sent over the process boundary.
You might think that since you've only scheduled the method self.run_task
to execute in the pool, that the state set from __init__
doesn't matter, but that's not the case. There is still a reference:
>>> ins = my_mp()
>>> ins.run_task.__self__.__dict__
{'process_num': 3,
'fh': <_io.TextIOWrapper name='test.txt' mode='w' encoding='UTF-8'>}
Note that calling ins = my_mp()
runs the __init__
method in the main process, and ins.run_task
is the object which gets sent over the process boundary.
There is a third-party library which provides a drop-in replacement for the stdlib multiprocessing Pool - pip install pathos
and replace the multiprocessing import with:
from pathos.multiprocessing import Pool
pathos uses dill, a more powerful serialization library than pickle, so it is able to serialize the objects returned by open()
. Your code should work again without any other changes. However, you should beware that each worker process will not know about other processes writing bytes to self.fh
, so whichever worker writes last may overwrite data written earlier from some other process.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With