I need help in understanding multiprocessing.Queue
. The problem I'm facing is that getting results from queue.get(...)
are hilariously behind compared to calls to queue.put(...)
and the queue's buffer (the deque).
This leaking abstraction led me to investigate the internals of the queue. Its straightforward source code just points me to the deque implementation, and that also seems simple enough that I cannot use it to explain the behavior I'm seeing. Also I read that Queue uses pipes, but I can't seem to find that in the source code.
I've boiled it down to a minimal example reproducing the problem, and I specify a possible output below that.
import threading
import multiprocessing
import queue
q = None
def enqueue(item):
global q
if q is None:
q = multiprocessing.Queue()
process = threading.Thread(target=worker, args=(q,)) # or multiprocessing.Process Doesn't matter
process.start()
q.put(item)
print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
def worker(local_queue):
while True:
try:
while True: # get all items
item = local_queue.get(block=False)
print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
except queue.Empty:
print('empty')
if __name__ == '__main__':
for i in range(1, 100000, 1000):
enqueue(list(range(i)))
Output:
empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28
I want you to notice the following about the result: After inserting element 28001, the worker finds that there are no elements left in the queue, whereas there are dozens more. Because of synchronization, I'm okay with only getting all but a few of them. But it only manages to find two!
And this pattern continues.
This seems to do with the size of the objects I put on the queue. For small objects, say i
as opposed to list(range(i))
, this problem does not appear. But the sizes of the objects that were talking about are still kilobytes, not nearly large enough to dignify such significant delays (in my real-world non-minimal example this took minutes easily)
My question specifically is: How can I share (not so) large amounts of data between processes in Python? Additionally, I'd like to know where in the internal implementation of Queue does this sluggishness comes from
In other words, Multiprocess queue is pretty slow putting and getting individual data, then QuickQueue wrap several data in one list, this list is one single data that is enqueue in the queue than is more quickly than put one individual data.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.
Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.
What is a Pipe. In multiprocessing, a pipe is a connection between two processes in Python. It is used to send data from one process which is received by another process. Under the covers, a pipe is implemented using a pair of connection objects, provided by the multiprocessing.
For future readers, you could also try using:
q = multiprocessing.Manager().Queue()
Instead of just
q = multiprocessing.Queue()
I haven't yet fully distilled and understood the mechanisms behind this behavior, but one source I've read claimed it's about:
"when pushing large items onto the queue, the items are essentially buffered, despite the immediate return of the queue’s put function."
The author goes on explaining more about it and a way to fix, but for me, adding the Manager did the trick easy and clean.
UPDATE: I believe this StackOverflow answer is helpful in explaining the issue.
FMQ, mentioned in the accepted answer, is also Python2 exclusive, which is one of the reasons I felt this answer could maybe help more people someday.
I met this problem too. I was sending large numpy arrays (~300MB), and it was so slow at mp.queue.get().
After some look into the python2.7 source code of mp.Queue, I found the slowest part (on unix-like systems) is _conn_recvall()
in socket_connection.c, but I was not looking deeper.
To workaround the problem I build an experimental package FMQ.
This project is inspired by the use of multiprocessing.Queue (mp.Queue). mp.Queue is slow for large data item because of the speed limitation of pipe (on Unix-like systems).
With mp.Queue handling the inter-process transfer, FMQ implements a stealer thread, which steals an item from mp.Queue once any item is available, and puts it into a Queue.Queue. Then, the consumer process can fetch the data from the Queue.Queue immediately.
The speed-up is based on the assumption that both producer and consumer processes are compute-intensive (thus multiprocessing is neccessary) and the data is large (eg. >50 227x227 images). Otherwise mp.Queue with multiprocessing or Queue.Queue with threading is good enough.
fmq.Queue is used easily like a mp.Queue.
Note that there are still some Known Issues, as this project is at its early stage.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With