I'm making some API requests which are limited at 20 per second. As to get the answer the waiting time is about 0.5 secs I thought to use multiprocessing.Pool.map and using this decorator rate-limiting So my code looks like
def fun(vec):
#do stuff
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(20)
def multi(vec):
p = Pool(5)
return p.map(f, vec)
I have 4 cores and this program works fine and there is an improvement in time compared to the loop version. Furthermore, when the Pool argument is 4,5,6 it works and the time is smaller for Pool(6) but when I use 7+ I got errors (Too many connections per second I guess).
Then if my function is more complicated and can do 1-5 requests the decorator doesn't work as expected. What else I can use in this case?
For anyone looking for use Pool remembers to close it otherwise you are going to use all the RAM
def multi(vec):
p = Pool(5)
res=p.map(f, vec)
p.close()
return res
I found that something like this WebRequestManager can do the trick. The problem is that doesn't work with multiprocessing. Pool with 19-20 processes because the time is stored in the class you need to call when you run the request.
Your indents are inconsistent up above which makes it harder to answer this, but I'll take a stab.
It looks like you're rate limiting the wrong thing; if f
is supposed be limited, you need to limit the calls to f
, not the calls to multi
. Doing this in something that's getting dispatched to the Pool
won't work, because the forked workers would each be limiting independently (forked processes will have independent tracking of the time since last call).
The easiest way to do this would be to limit how quickly the iterator that the Pool
pulls from produces results. For example:
import collections
import time
def rate_limited_iterator(iterable, limit_per_second):
# Initially, we can run immediately limit times
runats = collections.deque([time.monotonic()] * limit_per_second)
for x in iterable:
runat, now = runats.popleft(), time.monotonic()
if now < runat:
time.sleep(runat - now)
runats.append(time.monotonic() + 1)
yield x
def multi(vec):
with Pool(5) as p:
return list(p.imap(f, rate_limited_iterator(vec, 20)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With