Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Changing the Buffer size in multiprocessing.Queue

So I have a system with a producer and a consumer are connected by a queue of unlimited size, but if the consumer repeatedly calls get until the Empty exception is thrown it does not clear the queue.

I believe that this is because the thread in the queue on the consumer side which serialises the objects into the socket gets blocked once the socket buffer is full, and so it then waits until the buffer has space, however, it is possible for the consumer to call get "too fast" and so it thinks the queue is empty when in fact the thread on the other side has much more data to send but just cannot serialise it fast enough to prevent the socket appearing empty to the consumer.

I believe that this problem would be alleviated if I could change the buffer size on the underlying socket ( I am windows based). As far as I can see what I need to do then is something like:

import multiprocessing.connections as conns
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows
import multiprocessing.Queue as q

If I do the above, does that mean that when multirprocssing initialises a queue it will use the new buffer size which I have set in the version of multiprocessing.connections that I have already imported? Is that correct?

Also I beleive that this will only affect windows, as BUFSIZE is not used on linux machines because their all sockets are set to 60 kilobytes by default?

Has anyone tried this before? Would this have side-effects on windows? And what are the fundamental limits on socket buffer sizes on windows?

===================A code sample to demonstrate===================

# import multiprocessing.connection as conn
# conn.BUFSIZE = 2 ** 19
import sys
import multiprocessing as mp
from Queue import Empty
from time import sleep

total_length = 10**8

def supplier(q):
    print "Starting feeder"
    for i in range(total_length) :
        q.put(i)


if __name__=="__main__":

    queue = mp.Queue()

    p = mp.Process(target=supplier, args=(queue,))

    p.start()

    sleep(120)

    returned = []
    while True :
        try :
            returned.append(queue.get(block=False))
        except Empty :
            break

    print len(returned)
    print len(returned) == total_length

    p.terminate()
    sys.exit()

This sample, when run on windows, will typically only pull around 160,000 items from the queue, because the main thread can empty the buffer faster than it is refilled by the supplier and eventually it tries to pull from the queue when the buffer is empty and reports that it is empty.

You can in theory ameliorate this problem by having a larger buffer size. The two lines at the top will, i believe, on windows system, increase the default buffer size for the pipe.

If you comment them in then this script will pull more data before it exits, since it has a much higher . My main questions are then: 1) Does this actually work. 2) Is there a way to make this code use the same size of underlying buffer in windows and linux 3) Are there any unexpected side effects from setting large buffer sizes for pipes.

I am aware that in general, there is no way to know whether you have pulled all of the data from the queue (- given that the supplier runs permanently and produces data very unevenly), but I am looking for ways to improve that on a best effort basis.

like image 245
phil_20686 Avatar asked Nov 29 '16 17:11

phil_20686


People also ask

Is multiprocessing queue thread safe?

Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.

Which is the method used to change the default way to create child processes in multiprocessing?

Python provides the ability to create and manage new processes via the multiprocessing. Process class. In multiprocessing programming, we may need to change the technique used to start child processes. This is called the start method.

What is queue in multiprocessing?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.

How do I clear a multiprocessing queue?

There is no direct way of clearing a multiprocessing. Queue . I believe the closest you have is close() , but that simply states that no more data will be pushed to that queue, and will close it when all data has been flushed to the pipe.


1 Answers

Update:

useful link of Windows Pipe for people who need it in the future(the link is provided by OP, phil_20686): https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx

Origianl:

BUFSIZE is working only when the platform is win32.

multiprocessing.Queue is built on the top of Pipe, if you change the BUFSIZE, the Queue you generated will use the updated value. see below:

class Queue(object):

    def __init__(self, maxsize=0):
        if maxsize <= 0:
            maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
        self._maxsize = maxsize
        self._reader, self._writer = Pipe(duplex=False)

When the platform is win32, Pipe code will invoke following code:

def Pipe(duplex=True):
    '''
    Returns pair of connection objects at either end of a pipe
    '''
    address = arbitrary_address('AF_PIPE')
    if duplex:
        openmode = win32.PIPE_ACCESS_DUPLEX
        access = win32.GENERIC_READ | win32.GENERIC_WRITE
        obsize, ibsize = BUFSIZE, BUFSIZE
    else:
        openmode = win32.PIPE_ACCESS_INBOUND
        access = win32.GENERIC_WRITE
        obsize, ibsize = 0, BUFSIZE

    h1 = win32.CreateNamedPipe(
        address, openmode,
        win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
        win32.PIPE_WAIT,
        1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
        )

You can see that when duplex is False, outbuffer size is 0 and inbuffer size is BUFSIZE.

inbuffer is the number of bytes to reserve for the input buffer. 2**16=65536, it is the maximum bytes amount can be written in one operation without blocking, but the capacity of a buffer size varies across systems, it varies even it is on the same system, therefore it's hard to say the side effect when you set the Pipe the maximum amount.

like image 59
Haifeng Zhang Avatar answered Sep 28 '22 21:09

Haifeng Zhang