Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can a set() be shared between Python processes?

I am using multiprocessing in Python 2.7 to process a very large set of data. As each process runs, it adds integers to a shared mp.Manager.Queue(), but only if some other process hasn't already added the same integer. Since you can't do an "in"-style membership test for Queues, the way I'm doing it is to check each int for membership in a shared mp.Manager.list(). The list will eventually have ~30 million entries, and so membership tests will be extremely slow, nullifying the advantage of multiprocessing.

Here's a much simplified version of what I'm doing:

import multiprocessing as mp

def worker(shared_list, out_q, lock):
    # Do some processing and get an integer
    result_int = some_other_code()

    # Use a lock to ensure nothing is added to the list in the meantime
    lock.acquire()
    # This lookup can take forever when the list is large
    if result_int not in shared_list:
        out_q.put(result_int)
        shared_list.append(result_int)
    lock.release()

manager = mp.Manager()
shared_list = manager.list()
lock = manager.lock()
out_q = manager.Queue()

for i in range(8):
   p = mp.Process(target=worker, args=(shared_list, out_q, lock))
   p.start()

I previously tried using a set() instead of an mp.Manager.list(), but it seems that each process has its own memory space, and so when I updated the set, it didn't synchronize across processes. Hence, I switched to the current approach.

Here's roughly how I previously tried using a set(): import multiprocessing as mp

def worker(shared_set, out_q, lock):
    # Do some processing and get an integer
    result_int = some_other_code()

    # Use a lock to ensure nothing is added to the set in the meantime
    lock.acquire()
    # This lookup is fast, but the set doesn't reflect additions made by other processes.
    if result_int not in shared_set:
        out_q.put(result_int)
        shared_set.add(result_int)
    lock.release()

manager = mp.Manager()
lock = manager.lock()
out_q = manager.Queue()

# This set will NOT synchronize between processes
shared_set = set()


for i in range(8):
   p = mp.Process(target=worker, args=(shared_set, out_q, lock))
   p.start()

Note: these examples are untested and simply represent the relevant parts of my code.

Is there a way to share sets across processes, or otherwise do faster membership lookups?

EDIT: A little more information: the out_q is consumed by another process which writes the data to a single output file. There can be no duplicates. If I generate an integer and it's found to be a duplicate, the process needs to go back and generate the next-best integer.

like image 227
AJSmyth Avatar asked Jun 08 '16 23:06

AJSmyth


People also ask

Do Python processes share memory?

Shared memory can be a very efficient way of handling data in a program that uses concurrency. Python's mmap uses shared memory to efficiently share large amounts of data between multiple Python processes, threads, and tasks that are happening concurrently.

How does Python multiprocessing Queue work?

Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.

Does Python multiprocessing use multiple cores?

Key Takeaways. Python is NOT a single-threaded language. Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.


1 Answers

An obvious tweak is to use an mp.Manager.dict() instead of the set, and use arbitrary values (say, set the_dict[result_int] = 1 to indicate membership in the set). BTW, this is how "everyone" implemented sets before Python added the set type, and even now dicts and sets are implemented by basically the same code under the covers.

Added later: I confess I don't grasp why you used both a set and a list in the original code, since the set's keys are identical to the list's contents. If order of entry isn't important, why not forget the list entirely? Then you could also drop the layer of locking needed in the original to keep the set and the list in synch.

Fleshing that out, with the dict suggestion, the whole function would become just like:

def worker(shared_dict):
    # Do some processing and get an integer
    result_int = some_other_code()
    shared_dict[result_int] = 1

Other processes could do shared_dict.pop() then to get one value at a time (although, no, they couldn't wait on .pop() as they do for a queue's .get()).

And one more: consider using local (process-local) sets? They'll run much faster. Then each worker won't add any duplicates it knows about, but there may be duplicates across processes. Your code didn't give any hints about what the out_q consumer does, but if there's only one then a local set in that too could weed out cross-process duplicates. Or perhaps the memory burden gets too high then? Can't guess from here ;-)

BIG EDIT

I'm going to suggest a different approach: don't use mp.Manager at all. Most times I see people use it, they regret it, because it's not doing what they think it's doing. What they think: it's supplying physically shared objects. What it's doing: it's supplying semantically shared objects. Physically, they live in Yet Another, under-the-covers, process, and operations on the objects are forwarded to that latter process, where they're performed by that process in its own address space. It's not physically shared at all. So, while it can be very convenient, there are substantial interprocess overheads for even the simplest operations.

So I suggest instead using a single, ordinary set in one process, which will be the sole code concerned with weeding out duplicates. The worker processes produce ints with no concern for duplicates - they just pass the ints on. An mp.Queue is fine for that (again, no real need for an mp.Manager.Queue).

Like so, which is a complete executable program:

N = 20

def worker(outq):
    from random import randrange
    from time import sleep
    while True:    
        i = randrange(N)
        outq.put(i)
        sleep(0.1)

def uniqueifier(inq, outq):
    seen = set()
    while True:
        i = inq.get()
        if i not in seen:
            seen.add(i)
            outq.put(i)

def consumer(inq):
    for _ in range(N):
        i = inq.get()
        print(i)

if __name__ == "__main__":
    import multiprocessing as mp
    q1 = mp.Queue()
    q2 = mp.Queue()
    consume = mp.Process(target=consumer, args=(q2,))
    consume.start()
    procs = [mp.Process(target=uniqueifier, args=(q1, q2))]
    for _ in range(4):
        procs.append(mp.Process(target=worker, args=(q1,)))
    for p in procs:
        p.start()
    consume.join()
    for p in procs:
        p.terminate()

The second queue passed to uniqueifier plays the role of your original queue: it delivers only unique integers. No attempt is made to "share memory", and so no costs due to that are paid. The only interprocess communication is via easy, explicit mp.Queue operations. There is only one set, and since it's not shared in any way it runs as fast as possible.

In effect, this just sets up a simple pipeline, although with multiple inputs.

like image 126
Tim Peters Avatar answered Oct 08 '22 00:10

Tim Peters