I have implemented a SharedList in Python (version 3.7) with the help of Manager, Lock
of multiprocessing. I have used it as a shared object to among the process created using multiprocessing Process
function call. Shared List is used to storing the values/objects generated by each process Sharing it.
Implementation of SharedList with Manager
and Lock
of multiprocessing
of Python
class SharedList(object):
def __init__(self, limit):
self.manager = Manager()
self.results = self.manager.list([])
self.lock = Lock()
self.limit = limit
def append(self, new_value):
with self.lock:
if len(self.results) == self.limit:
return False
self.results.append(new_value)
return True
def list(self):
with self.lock:
return list(self.results).copy()
Usage of created SharedList to store the values by multiple processes created using multiprocessing
results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = Process(target=child_function, args=(results))
processes.append(new_process)
new_process.start()
for _process in processes:
_process.join()
for _process in processes:
_process.close()
Implementation of child_function
while True:
result = func()
if not (results.append(result)):
break
The implementation works of some scenarios, but hangup, when I have increased the limit. I have used the processor count less than the number of CPU and do the same experiment still hang up in the same position.
Are there any better approaches to the above problem, I have looked into different approaches such as using Queue, but that doesn't work as expected, get hang up?
Added the Previous Implementation using Queue
Implementation using Queue
results_out = []
manager = multiprocessing.Manager()
results = manager.Queue()
tasks = manager.Queue()
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = multiprocessing.Process(target=child_function,
args=(tasks, results)
processes.append(new_process)
new_process.start()
sleep(5)
for i in range(limit):
tasks.put(0)
sleep(1)
for i in range(num_processes):
tasks.put(-1)
num_finished_processes = 0
while True:
new_result = results.get()
if new_result == -1:
num_finished_processes += 1
if num_finished_processes == num_processes:
break
else:
results_out.append(new_result)
for process in processes:
process.join()
for process in processes:
process.close()
In child_function
while True:
task_val = tasks.get()
if task_val < 0:
results.put(-1)
break
else:
result = func()
results.put(result)
Updated
I had gone through the following references, before posting this question, but I am unable to get the desired output. I agree, this code led to a deadlock state, but I am unable to find an implementation without deadlock using multiprocessing in python
References
Multiprocessing of shared list
https://pymotw.com/2/multiprocessing/basics.html
Shared variable in python's multiprocessing
https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing
https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba
http://kmdouglass.github.io/posts/learning-pythons-multiprocessing-module/
python multiprocessing/threading cleanup
Based on the suggestion, I was able to modify SharedList using Queue
class SharedList(object):
def __init__(self, limit):
self.manager = Manager()
self.tasks = self.manager.Queue()
self.results = self.manager.Queue()
self.limit = limit
self.no_of_process = min(process_count, limit)
def setup(self):
sleep(1)
for i in range(self.limit):
self.tasks.put(0)
sleep(1)
for i in range(self.no_of_process):
self.tasks.put(-1)
def append(self, new_value):
task_val = self.tasks.get()
if task_val < 0:
self.results.put(-1)
return False
else:
self.results.put(new_value)
return True
def list(self):
results_out = []
num_finished_processes = 0
while True:
new_result = self.results.get()
if new_result == -1:
num_finished_processes += 1
if num_finished_processes == self.no_of_process:
break
else:
results_out.append(new_result)
return results_out
This implementation works fine, with following implementation change
results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = Process(target=child_function, args=(results))
processes.append(new_process)
new_process.start()
results.setup()
for _process in processes:
_process.join()
for _process in processes:
_process.close()
Implementation of child_function
while True:
result = func()
if not (results.append(result)):
break
But Still, again this ended up in deadlock, where it hangup, after some iterations
I found the below article based on Ray, which sounds interesting and easy to implement parallel computing, effectively as well as time-efficient
https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With