Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python sharing a deque between multiprocessing processes

I've been looking at the following questions for the pas hour without any luck:

Python sharing a dictionary between parallel processes

multiprocessing: sharing a large read-only object between processes?

multiprocessing in python - sharing large object (e.g. pandas dataframe) between multiple processes

I've written a very basic test file to illustrate what I'm trying to do:

from collections import deque
from multiprocessing import Process
import numpy as np


class TestClass:
    def __init__(self):
        self.mem = deque(maxlen=4)
        self.process = Process(target=self.run)

    def run(self):
        while True:
            self.mem.append(np.array([0, 1, 2, 3, 4]))


def print_values(x):
    while True:
        print(x)


test = TestClass()
process = Process(target=print_values(test.mem))

test.process.start()
process.start()

Currently this outputs the following :

deque([], maxlen=4)

How can I access the mem value's from the main code or the process that runs "print_values"?

like image 425
Kenneth Breugelmans Avatar asked Jan 03 '23 22:01

Kenneth Breugelmans


2 Answers

Unfortunately multiprocessing.Manager() doesn't support deque but it does work with list, dict, Queue, Value and Array. A list is fairly close so I've used it in the example below..

from multiprocessing import Process, Manager, Lock
import numpy as np

class TestClass:
    def __init__(self):
        self.maxlen = 4
        self.manager = Manager()
        self.mem = self.manager.list()
        self.lock = self.manager.Lock()
        self.process = Process(target=self.run, args=(self.mem, self.lock))

    def run(self, mem, lock):
        while True:
            array = np.random.randint(0, high=10, size=5)
            with lock:
                if len(mem) >= self.maxlen:
                    mem.pop(0)
                mem.append(array)

def print_values(mem, lock):
    while True:
        with lock:
            print mem

test = TestClass()
print_process = Process(target=print_values, args=(test.mem, test.lock))
test.process.start()
print_process.start()

test.process.join()
print_process.join()

You have to be a little careful using manager objects. You can use them a lot like the objects they reference but you can't do something like... mem = mem[-4:] to truncate the values because you're changing the referenced object.

As for coding style, I might move the Manager objects outside the class or move the print_values function inside it but for an example, this works. If you move things around, just note that you can't use self.mem directly in the run method. You need to pass it in when you start the process or the fork that python does in the background will create a new instance and it won't be shared.

Hopefully this works for your situation, if not, we can try to adapt it a bit.

like image 96
bivouac0 Avatar answered Jan 05 '23 17:01

bivouac0


So by combining the code provided by @bivouac0 and the comment @Marijn Pieters posted, I came up with the following solution:

from multiprocessing import Process, Manager, Queue


class testClass:
    def __init__(self, maxlen=4):
        self.mem = Queue(maxsize=maxlen)
        self.process = Process(target=self.run)

    def run(self):
        i = 0

        while True:
            self.mem.empty()
            while not self.mem.full():
                self.mem.put(i)
                i += 1


def print_values(queue):
    while True:
        values = queue.get()
        print(values)


if __name__ == "__main__":
    test = testClass()
    print_process = Process(target=print_values, args=(test.mem,))

    test.process.start()
    print_process.start()

    test.process.join()
    print_process.join()
like image 40
Kenneth Breugelmans Avatar answered Jan 05 '23 16:01

Kenneth Breugelmans