I am trying to use concurrent.futures.ProcessPoolExecutor
with Locks, but I'm getting a run time error.
(I'm working on Windows if that's relevant)
Here's my code:
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import time
def f(i, lock):
with lock:
print(i, 'hello')
time.sleep(1)
print(i, 'world')
def main():
lock = multiprocessing.Lock()
pool = ProcessPoolExecutor()
futures = [pool.submit(f, num, lock) for num in range(3)]
for future in futures:
future.result()
if __name__ == '__main__':
main()
Here's the error I get:
Traceback (most recent call last):
File "F:\WinPython-64bit-3.4.3.2\python-3.4.3.amd64\Lib\multiprocessing\queues.py", line 242, in _feed
obj = ForkingPickler.dumps(obj)
File "F:\WinPython-64bit-3.4.3.2\python-3.4.3.amd64\Lib\multiprocessing\reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
File "F:\WinPython-64bit-3.4.3.2\python-3.4.3.amd64\Lib\multiprocessing\synchronize.py", line 102, in __getstate__
context.assert_spawning(self)
File "F:\WinPython-64bit-3.4.3.2\python-3.4.3.amd64\Lib\multiprocessing\context.py", line 347, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: Lock objects should only be shared between processes through inheritance
What's weird is that if I write the same code with multiprocessing.Process
it all works fine:
import multiprocessing
import time
def f(i, lock):
with lock:
print(i, 'hello')
time.sleep(1)
print(i, 'world')
def main():
lock = multiprocessing.Lock()
processes = [multiprocessing.Process(target=f, args=(i, lock)) for i in range(3)]
for process in processes:
process.start()
for process in processes:
process.join()
if __name__ == '__main__':
main()
This works and I get:
1 hello
1 world
0 hello
0 world
2 hello
2 world
The ProcessPoolExecutor implements the Executor abstract class and provides a process pool in Python. In this tutorial, you will discover the ProcessPoolExecutor class. Let's get started.
Python provides a mutual exclusion lock for use with processes via the multiprocessing. Lock class. An instance of the lock can be created and then acquired by processes before accessing a critical section, and released after the critical section.
The ProcessPoolExecutor in Python provides a process pool that lets you run tasks concurrently. You can add tasks to the pool by calling submit() with your function name, which will return a Future object. You can call the cancel() function on the Future object to cancel the task before it has started running.
ThreadPoolExecutor Thread-Safety Although the ThreadPoolExecutor uses threads internally, you do not need to work with threads directly in order to execute tasks and get results. Nevertheless, when accessing resources or critical sections, thread-safety may be a concern.
You need to use a Manager
and use a Manager.Lock()
instead:
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import time
def f(i, lock):
with lock:
print(i, 'hello')
time.sleep(1)
print(i, 'world')
def main():
pool = ProcessPoolExecutor()
m = multiprocessing.Manager()
lock = m.Lock()
futures = [pool.submit(f, num, lock) for num in range(3)]
for future in futures:
future.result()
if __name__ == '__main__':
main()
Result:
% python locks.py
0 hello
0 world
1 hello
1 world
2 hello
2 world
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With