Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why can't add file handler with the form of self.fh in the init method?

os and python info:

uname -a
Linux debian 5.10.0-8-amd64 #1 SMP Debian 5.10.46-4 (2021-08-03) x86_64 GNU/Linux
python3 --version
Python 3.9.2

Here is a simple class which can start multiprocessing.

from multiprocessing.pool import Pool    

class my_mp(object):
    def __init__(self):
        self.process_num = 3
        fh = open('test.txt', 'w')
    def run_task(self,i):
        print('process {} start'.format(str(i)))
        time.sleep(2)
        print('process {} end'.format(str(i)))
    def run(self):
        pool = Pool(processes = self.process_num)
        for i in range(self.process_num):
            pool.apply_async(self.run_task,args = (i,))
        pool.close()
        pool.join()

Initialize the my_mp class,then start multiprocess.

ins = my_mp()
ins.run()
process 0 start
process 1 start
process 2 start
process 0 end
process 2 end
process 1 end

Now replace fh = open('test.txt', 'w') with self.fh = open('test.txt', 'w') in my_mp class and try again.

ins = my_mp()
ins.run()    

No output!Why no process start?

>>> from multiprocessing.pool import Pool    
>>> 
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         fh = open('test.txt', 'w')
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
process 0 start
process 1 start
process 2 start
process 2 end
process 0 end
process 1 end
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         self.fh = open('test.txt', 'w')
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
>>> x.run()
>>> x = my_mp()
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         fh = open('test.txt', 'w')
...         self.fh = fh
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
>>> 

Why can't add file handler with the form of self.fh in the __init__ method?I have never called the file handler defined in __init__ in any process.

like image 411
showkey Avatar asked Oct 09 '21 13:10

showkey


1 Answers

The problem:

Stdlib multiprocessing uses pickle to serialize objects. Anything which needs to be sent across the process boundary needs to be picklable.

Custom class instances are generally picklable, as long as all their attributes are picklable - it works by importing the type within the subprocess and unpickling the attributes.

The issue is that the object returned by open() is not picklable.

>>> class A:
...     pass
... 
>>> import pickle
>>> pickle.dumps(A())
b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x01A\x94\x93\x94)\x81\x94.'
>>> class A:
...     def __init__(self):
...         self.fh = open("test.txt", "w")
... 
>>> pickle.dumps(A())
TypeError: cannot pickle '_io.TextIOWrapper' object

In the first case, the multiprocessing pool still works because fh is just a local variable and it's deleted as soon as it's out of scope, i.e. when the __init__ method returns. But as soon as you save this handle into the instance's namespace with self.fh = open(...), there will remain a reference and it will need to be sent over the process boundary.

You might think that since you've only scheduled the method self.run_task to execute in the pool, that the state set from __init__ doesn't matter, but that's not the case. There is still a reference:

>>> ins = my_mp()
>>> ins.run_task.__self__.__dict__
{'process_num': 3,
 'fh': <_io.TextIOWrapper name='test.txt' mode='w' encoding='UTF-8'>}

Note that calling ins = my_mp() runs the __init__ method in the main process, and ins.run_task is the object which gets sent over the process boundary.

Solution:

There is a third-party library which provides a drop-in replacement for the stdlib multiprocessing Pool - pip install pathos and replace the multiprocessing import with:

from pathos.multiprocessing import Pool

pathos uses dill, a more powerful serialization library than pickle, so it is able to serialize the objects returned by open(). Your code should work again without any other changes. However, you should beware that each worker process will not know about other processes writing bytes to self.fh, so whichever worker writes last may overwrite data written earlier from some other process.

like image 68
wim Avatar answered Sep 28 '22 03:09

wim