Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

multiprocessing.Pool and Rate limit

I'm making some API requests which are limited at 20 per second. As to get the answer the waiting time is about 0.5 secs I thought to use multiprocessing.Pool.map and using this decorator rate-limiting So my code looks like

def fun(vec):
        #do stuff

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(20)
def multi(vec):
    p = Pool(5)   
    return p.map(f, vec)

I have 4 cores and this program works fine and there is an improvement in time compared to the loop version. Furthermore, when the Pool argument is 4,5,6 it works and the time is smaller for Pool(6) but when I use 7+ I got errors (Too many connections per second I guess).

Then if my function is more complicated and can do 1-5 requests the decorator doesn't work as expected. What else I can use in this case?

UPDATE

For anyone looking for use Pool remembers to close it otherwise you are going to use all the RAM

def multi(vec):
    p = Pool(5)   
    res=p.map(f, vec)
    p.close()
    return res

UPDATE 2

I found that something like this WebRequestManager can do the trick. The problem is that doesn't work with multiprocessing. Pool with 19-20 processes because the time is stored in the class you need to call when you run the request.

like image 329
rpanai Avatar asked Oct 13 '25 10:10

rpanai


1 Answers

Your indents are inconsistent up above which makes it harder to answer this, but I'll take a stab.

It looks like you're rate limiting the wrong thing; if f is supposed be limited, you need to limit the calls to f, not the calls to multi. Doing this in something that's getting dispatched to the Pool won't work, because the forked workers would each be limiting independently (forked processes will have independent tracking of the time since last call).

The easiest way to do this would be to limit how quickly the iterator that the Pool pulls from produces results. For example:

import collections
import time
def rate_limited_iterator(iterable, limit_per_second):
    # Initially, we can run immediately limit times
    runats = collections.deque([time.monotonic()] * limit_per_second)
    for x in iterable:
        runat, now = runats.popleft(), time.monotonic()
        if now < runat:
            time.sleep(runat - now)
        runats.append(time.monotonic() + 1)
        yield x

def multi(vec):
    with Pool(5) as p:
        return list(p.imap(f, rate_limited_iterator(vec, 20)))
like image 57
ShadowRanger Avatar answered Oct 14 '25 22:10

ShadowRanger