Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is proper way to use shared list in multiprocessing

I have implemented a SharedList in Python (version 3.7) with the help of Manager, Lock of multiprocessing. I have used it as a shared object to among the process created using multiprocessing Process function call. Shared List is used to storing the values/objects generated by each process Sharing it.

Implementation of SharedList with Manager and Lock of multiprocessing of Python

class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.results = self.manager.list([])
        self.lock = Lock()
        self.limit = limit

    def append(self, new_value):
        with self.lock:
            if len(self.results) == self.limit:
                return False
            self.results.append(new_value)
            return True

    def list(self):
        with self.lock:
            return list(self.results).copy()

Usage of created SharedList to store the values by multiple processes created using multiprocessing

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

Implementation of child_function

while True:
  result = func()
  if not (results.append(result)):
     break

The implementation works of some scenarios, but hangup, when I have increased the limit. I have used the processor count less than the number of CPU and do the same experiment still hang up in the same position.

Are there any better approaches to the above problem, I have looked into different approaches such as using Queue, but that doesn't work as expected, get hang up?

Added the Previous Implementation using Queue

Implementation using Queue

results_out = []
manager = multiprocessing.Manager()
results = manager.Queue()
tasks = manager.Queue()
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
    new_process = multiprocessing.Process(target=child_function,
                                            args=(tasks, results)
    processes.append(new_process)
    new_process.start()

sleep(5)
for i in range(limit):
    tasks.put(0)
sleep(1)

for i in range(num_processes):
    tasks.put(-1)

num_finished_processes = 0
while True:
    new_result = results.get()
    if new_result == -1:
        num_finished_processes += 1
        if num_finished_processes == num_processes:
            break
    else:
        results_out.append(new_result)

for process in processes:
    process.join()

for process in processes:
    process.close()

In child_function

while True:
    task_val = tasks.get()
    if task_val < 0:
        results.put(-1)
        break
    else:
        result = func()
        results.put(result)

Updated

I had gone through the following references, before posting this question, but I am unable to get the desired output. I agree, this code led to a deadlock state, but I am unable to find an implementation without deadlock using multiprocessing in python

References

  1. Multiprocessing of shared list

  2. https://pymotw.com/2/multiprocessing/basics.html

  3. Shared variable in python's multiprocessing

  4. https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing

  5. https://medium.com/@urban_institute/using-multiprocessing-to-make-python-code-faster-23ea5ef996ba

  6. http://kmdouglass.github.io/posts/learning-pythons-multiprocessing-module/

  7. python multiprocessing/threading cleanup

Based on the suggestion, I was able to modify SharedList using Queue

class SharedList(object):
    def __init__(self, limit):
        self.manager = Manager()
        self.tasks = self.manager.Queue()
        self.results = self.manager.Queue()
        self.limit = limit
        self.no_of_process = min(process_count, limit)

    def setup(self):
        sleep(1)
        for i in range(self.limit):
            self.tasks.put(0)
        sleep(1)
        for i in range(self.no_of_process):
            self.tasks.put(-1)

    def append(self, new_value):
        task_val = self.tasks.get()
        if task_val < 0:
            self.results.put(-1)
            return False
        else:
            self.results.put(new_value)
            return True

    def list(self):
        results_out = []
        num_finished_processes = 0
        while True:
            new_result = self.results.get()
            if new_result == -1:
                num_finished_processes += 1
                if num_finished_processes == self.no_of_process:
                    break
            else:
                results_out.append(new_result)
        return results_out

This implementation works fine, with following implementation change

results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
   new_process = Process(target=child_function, args=(results))
   processes.append(new_process)
   new_process.start()

results.setup()

for _process in processes:
   _process.join()

for _process in processes:
   _process.close()

Implementation of child_function

while True:
  result = func()
  if not (results.append(result)):
     break

But Still, again this ended up in deadlock, where it hangup, after some iterations

like image 821
Amutheezan Avatar asked Nov 07 '22 12:11

Amutheezan


1 Answers

I found the below article based on Ray, which sounds interesting and easy to implement parallel computing, effectively as well as time-efficient

https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8

like image 176
Amutheezan Avatar answered Nov 14 '22 21:11

Amutheezan