There are three questions as possible duplicates (but too specific):
By answering this question all three other questions can be answered. Hopefully I make myself clear:
Once I created an object in some process created by multiprocessing:
Example 1 (solved)
from concurrent.futures import *
def f(v):
return lambda: v * v
if __name__ == '__main__':
with ThreadPoolExecutor(1) as e: # works with ThreadPoolExecutor
l = list(e.map(f, [1,2,3,4]))
print([g() for g in l]) # [1, 4, 9, 16]
Example 2
Suppose f
returns an object with mutable state. This identical object should be accessible from other processes.
Example 3
I have an object which has an open file and a lock - how do I grant access to other processes?
Reminder
I do not want this specific error to not appear. Or a solution to this specific usecase. The solution should be general enough to just share unmovable objects between processes. The objects can potentially be created in any process. A solution that makes all objects movable and preserves identity can be good, too.
Any hints are welcome, any partial solution or code fragments that point at how to implement a solution are worth something. So we can create a solution together.
Here is an attempt to solve this but without multiprocessing: https://github.com/niccokunzmann/pynet/blob/master/documentation/done/tools.rst
Questions
What you want the other processes to do with the references?
The references can be passed to any other process created with multiprocessing(duplicate 3). One can access attributes, call the reference. Accessed attibutes may or may not be proxies.
What's the problem with just using a proxy?
Maybe there is no problem but a challenge. My impression was that a proxy has a manager and that a manager has its own process and so the unserializable object must be serialized and transfered (partially solved with StacklessPython/fork). Also there exist proxies for special objects - it is hard but not impossible to build a proxy for all objects (solvable).
Solution? - Proxy + Manager?
Eric Urban showed that serialization is not the problem. The real challenge is in Example2&3: the synchronization of state. My idea of a solution would be to create a special proxy class for a manager. This proxy class
However, the multiprocess tasks can't be pickled; it would raise an error failing to pickle. That's because when dividing a single task over multiprocess, these might need to share data; however, it doesn't share memory space.
Pool supports multiple tasks, whereas the multiprocessing. Process class supports a single task. The Pool is designed to submit and execute multiple tasks. For example, the map(), imap(), and starmap() functions are explicitly designed to perform multiple function calls in parallel.
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
Before reading this answer, please note that the solution explained in it is terrible. Please note the warning at the end of the answer.
I found a way to share the state of an object through multiprocessing.Array
.
So I made this class that transparently shares it's state through all processes:
import multiprocessing as m
import pickle
class Store:
pass
class Shareable:
def __init__(self, size = 2**10):
object.__setattr__(self, 'store', m.Array('B', size))
o = Store() # This object will hold all shared values
s = pickle.dumps(o)
store(object.__getattribute__(self, 'store'), s)
def __getattr__(self, name):
s = load(object.__getattribute__(self, 'store'))
o = pickle.loads(s)
return getattr(o, name)
def __setattr__(self, name, value):
s = load(object.__getattribute__(self, 'store'))
o = pickle.loads(s)
setattr(o, name, value)
s = pickle.dumps(o)
store(object.__getattribute__(self, 'store'), s)
def store(arr, s):
for i, ch in enumerate(s):
arr[i] = ch
def load(arr):
l = arr[:]
return bytes(arr)
You can pass instances of this class (and it's subclasses) to any other process and it will synchronize it's state through all processes. This was tested with this code:
class Foo(Shareable):
def __init__(self):
super().__init__()
self.f = 1
def foo(self):
self.f += 1
def f(s):
s.f += 1
if __name__ == '__main__':
import multiprocessing as m
import time
s = Foo()
print(s.f)
p = m.Process(target=f, args=(s,))
p.start()
time.sleep(1)
print(s.f)
The "magic" of this class is that it stores all of it attributes in another instance of the class Store
. This class isn't very special. It's just some class that can have arbitrary attributes. (A dict would have done as well.)
However, this class has some really nasty quirks. I found two.
The first quirk is that you have to specify how much space the Store
instance will take at most. This is because multiprocessing.Array
has a static size. So the object that can be pickled in it can only be as large as the array.
The second quirk is that you can't use this class with ProcessPoolExecutors or simple Pools. If you try to do this, you get an error:
>>> s = Foo()
>>> with ProcessPoolExecutor(1) as e:
... e.submit(f, args=(s,))
...
<Future at 0xb70fe20c state=running>
Traceback (most recent call last):
<omitted>
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance
warning
You should probably not use this approach, as it uses an uncontrollable amount of memory, is overly complicated compared to using a proxy (see my other answer) and might crash in spectacular ways.
Just use stackless python. You can serialize almost anything with pickle
, including functions. Here I serialize and deserialize a lambda
using the pickle
module. This is similar to what you are trying to do in your example.
Here is the download link for Stackless Python http://www.stackless.com/wiki/Download
Python 2.7.5 Stackless 3.1b3 060516 (default, Sep 23 2013, 20:17:03)
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> f = 5
>>> g = lambda : f * f
>>> g()
25
>>> import pickle
>>> p = pickle.dumps(g)
>>> m = pickle.loads(p)
>>> m()
25
>>>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With