I am using multiprocessing in Python 2.7 to process a very large set of data. As each process runs, it adds integers to a shared mp.Manager.Queue(), but only if some other process hasn't already added the same integer. Since you can't do an "in"-style membership test for Queues, the way I'm doing it is to check each int for membership in a shared mp.Manager.list(). The list will eventually have ~30 million entries, and so membership tests will be extremely slow, nullifying the advantage of multiprocessing.
Here's a much simplified version of what I'm doing:
import multiprocessing as mp
def worker(shared_list, out_q, lock):
# Do some processing and get an integer
result_int = some_other_code()
# Use a lock to ensure nothing is added to the list in the meantime
lock.acquire()
# This lookup can take forever when the list is large
if result_int not in shared_list:
out_q.put(result_int)
shared_list.append(result_int)
lock.release()
manager = mp.Manager()
shared_list = manager.list()
lock = manager.lock()
out_q = manager.Queue()
for i in range(8):
p = mp.Process(target=worker, args=(shared_list, out_q, lock))
p.start()
I previously tried using a set() instead of an mp.Manager.list(), but it seems that each process has its own memory space, and so when I updated the set, it didn't synchronize across processes. Hence, I switched to the current approach.
Here's roughly how I previously tried using a set(): import multiprocessing as mp
def worker(shared_set, out_q, lock):
# Do some processing and get an integer
result_int = some_other_code()
# Use a lock to ensure nothing is added to the set in the meantime
lock.acquire()
# This lookup is fast, but the set doesn't reflect additions made by other processes.
if result_int not in shared_set:
out_q.put(result_int)
shared_set.add(result_int)
lock.release()
manager = mp.Manager()
lock = manager.lock()
out_q = manager.Queue()
# This set will NOT synchronize between processes
shared_set = set()
for i in range(8):
p = mp.Process(target=worker, args=(shared_set, out_q, lock))
p.start()
Note: these examples are untested and simply represent the relevant parts of my code.
Is there a way to share sets across processes, or otherwise do faster membership lookups?
EDIT: A little more information: the out_q is consumed by another process which writes the data to a single output file. There can be no duplicates. If I generate an integer and it's found to be a duplicate, the process needs to go back and generate the next-best integer.
Shared memory can be a very efficient way of handling data in a program that uses concurrency. Python's mmap uses shared memory to efficiently share large amounts of data between multiple Python processes, threads, and tasks that are happening concurrently.
Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.
Key Takeaways. Python is NOT a single-threaded language. Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.
An obvious tweak is to use an mp.Manager.dict()
instead of the set, and use arbitrary values (say, set the_dict[result_int] = 1
to indicate membership in the set). BTW, this is how "everyone" implemented sets before Python added the set
type, and even now dicts and sets are implemented by basically the same code under the covers.
Added later: I confess I don't grasp why you used both a set and a list in the original code, since the set's keys are identical to the list's contents. If order of entry isn't important, why not forget the list entirely? Then you could also drop the layer of locking needed in the original to keep the set and the list in synch.
Fleshing that out, with the dict suggestion, the whole function would become just like:
def worker(shared_dict):
# Do some processing and get an integer
result_int = some_other_code()
shared_dict[result_int] = 1
Other processes could do shared_dict.pop()
then to get one value at a time (although, no, they couldn't wait on .pop()
as they do for a queue's .get()
).
And one more: consider using local (process-local) sets? They'll run much faster. Then each worker won't add any duplicates it knows about, but there may be duplicates across processes. Your code didn't give any hints about what the out_q
consumer does, but if there's only one then a local set in that too could weed out cross-process duplicates. Or perhaps the memory burden gets too high then? Can't guess from here ;-)
I'm going to suggest a different approach: don't use mp.Manager
at all. Most times I see people use it, they regret it, because it's not doing what they think it's doing. What they think: it's supplying physically shared objects. What it's doing: it's supplying semantically shared objects. Physically, they live in Yet Another, under-the-covers, process, and operations on the objects are forwarded to that latter process, where they're performed by that process in its own address space. It's not physically shared at all. So, while it can be very convenient, there are substantial interprocess overheads for even the simplest operations.
So I suggest instead using a single, ordinary set in one process, which will be the sole code concerned with weeding out duplicates. The worker processes produce ints with no concern for duplicates - they just pass the ints on. An mp.Queue
is fine for that (again, no real need for an mp.Manager.Queue
).
Like so, which is a complete executable program:
N = 20
def worker(outq):
from random import randrange
from time import sleep
while True:
i = randrange(N)
outq.put(i)
sleep(0.1)
def uniqueifier(inq, outq):
seen = set()
while True:
i = inq.get()
if i not in seen:
seen.add(i)
outq.put(i)
def consumer(inq):
for _ in range(N):
i = inq.get()
print(i)
if __name__ == "__main__":
import multiprocessing as mp
q1 = mp.Queue()
q2 = mp.Queue()
consume = mp.Process(target=consumer, args=(q2,))
consume.start()
procs = [mp.Process(target=uniqueifier, args=(q1, q2))]
for _ in range(4):
procs.append(mp.Process(target=worker, args=(q1,)))
for p in procs:
p.start()
consume.join()
for p in procs:
p.terminate()
The second queue passed to uniqueifier
plays the role of your original queue: it delivers only unique integers. No attempt is made to "share memory", and so no costs due to that are paid. The only interprocess communication is via easy, explicit mp.Queue
operations. There is only one set, and since it's not shared in any way it runs as fast as possible.
In effect, this just sets up a simple pipeline, although with multiple inputs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With