I have a pretty straight forward code where I load a list if id's from a file and then iterate through each id in the list and call an api where i pass the id value and dump the api response content into a file.
I would like to speed this process up by doing parallel api calls, however the api server only allows 5 calls max per second. another key consideration is the api pull is slow, on average each call takes 10 seconds to finish.
I would like to be able to have multiple parallel process which have some way of ensuring that no more than 5 calls max occur in a single second.
This is the current code:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
def dump_data(df,idx):
filename = base_dir+'\\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(idx):
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df,idx)
Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(idx) for idx in ids)
I'm currently using joblib, but if there is a better library for this solution it can be used instead.
How can I ensure there will not be more than 5 requests going out at any given second? (while at he same time doing all the requests as fast as possible)
Also I'm using Python 3.9 on Windows
Update 2
After rethinking this a bit more, it makes more sense to use either a standard multithreading pool or multiprocessing pool depending on your needs and to then pass to the worker function (either indirectly as a global or explicitly as an argument) a CallingThrottle
instance whose throttle
method can be called directly by the worker function at the precise point in processing that throttling needs to take place (right before making the request to the website). Passing the throttle instance directly as an argument to your worker function should allow you to use this with joblib
(but I would think in your case all you need is a multithreading pool).
For example:
from multiprocessing.pool import ThreadPool, Pool
from multiprocessing.managers import BaseManager
from threading import Lock
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
self.lock = Lock()
def throttle(self):
with self.lock:
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
pass
CallingThrottleManager.register('CallingThrottle', CallingThrottle)
def init_pool(throttle):
global calling_throttle
calling_throttle = throttle
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 5 of these per second.
"""
from datetime import datetime
calling_throttle.throttle()
print(datetime.now(), 'x =', x)
time.sleep(10)
return x, x * x
def main():
# Multithreading example:
calling_throttle = CallingThrottle(5, 1) # 5 calls every 1 second
pool = ThreadPool(20)
init_pool(calling_throttle)
start = time.time()
results = pool.map(worker, range(20))
print('Total elapsed time:', time.time() - start)
pool.close()
pool.join()
print('\n', '-' * 30, '\n', sep='')
# Multiprocessing example:
with CallingThrottleManager() as manager:
calling_throttle = manager.CallingThrottle(5, 1) # 5 calls every 1 second
pool = Pool(20, initializer=init_pool, initargs=(calling_throttle,))
start = time.time()
results = pool.map(worker, range(20))
print('Total elapsed time:', time.time() - start)
pool.close()
pool.join()
if __name__ == '__main__':
main()
To use a throttle with joblib
:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
from multiprocessing.managers import BaseManager
from threading import Lock
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
self.lock = Lock()
def throttle(self):
with self.lock:
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
pass
def dump_data(df, idx):
filename = base_dir+'\\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(calling_throttle, idx):
calling_throttle.throttle()
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df, idx)
def main():
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
CallingThrottleManager.register('CallingThrottle', CallingThrottle)
with CallingThrottleManager() as manager:
calling_throttle = manager.CallingThrottle()
Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(calling_throttle, idx) for idx in ids)
if __name__ == '__main__':
main()
Update 1
I originally implemented the rate-limiting algorithm that was referenced in the comment made by @balmy and it was noticed that there are times where the rate can be exceeded. This phenomenon was commented upon by @mindvirus where the OP was trying for 5 messages in an 8 second period:
This is good, but can exceed the rate. Let's say at time 0 you forward 5 messages, then at time N * (8/5) for N = 1, 2, ... you can send another message, resulting in more than 5 messages in an 8 second period.
So I am now using a new rate-limiting algorithm.
I have created two classes, RateLimitedProcessPool
and RateLimitedThreadPool
for mulitprocessing and multithreading respectively based on the algorithm presented in What's a good rate limiting algorithm?. These classes are like the standard mulitprocessing.pool.Pool
and multiprocessing.pool.ThreadPool
classes except the __init__
methods take two extra keyword arguments rate and per that together specify the maximum rate per second that the apply_async
method can be called. For example, values rate=7 and per=3 implies that successive calls to apply_async
will throttle so as to only allow a maximum rate of 7 calls every 3 seconds.
The following code demonstrates this with a simple worker function that emulates the OP's situation where the worker function takes 10 seconds to execute and must be limited to a maximum rate of 5 calls per second. We need to invoke this function 20 times and so the best performance we can achieve is a total run time of approximately 13 seconds.
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
def __enter__(self):
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
def __exit__(self, *exc):
pass
class RateLimitedPool:
def __init__(self, rate, per):
self.calling_throttle = CallingThrottle(rate, per)
self.first_time = True
def apply_async(self, *args, **kwargs):
# There could be a lag between the first call to apply_async
# and the first task actually starting, so set the first time
# after the call to apply_async:
if self.first_time:
self.first_time = False
async_result = super().apply_async(*args, **kwargs)
with self.calling_throttle:
pass
return async_result
else:
with self.calling_throttle:
return super().apply_async(*args, **kwargs)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
########################################
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 5 of these per second.
"""
from datetime import datetime
print(datetime.now(), 'x =', x)
time.sleep(10)
return x, x * x
def main():
args = range(20)
pool = RateLimitedThreadPool(20, rate=5, per=1) # 5 per second
start = time.time()
for x in args:
pool.apply_async(worker, args=(x,))
# Wait for all tasks to complete
pool.close()
pool.join()
print('Total elapsed time:', time.time() - start)
if __name__ == '__main__':
main()
Prints:
2021-10-03 07:19:48.002628 x = 0
2021-10-03 07:19:48.002628 x = 1
2021-10-03 07:19:48.002628 x = 3
2021-10-03 07:19:48.002628 x = 4
2021-10-03 07:19:48.002628 x = 2
2021-10-03 07:19:49.005625 x = 5
2021-10-03 07:19:49.005625 x = 6
2021-10-03 07:19:49.005625 x = 8
2021-10-03 07:19:49.005625 x = 7
2021-10-03 07:19:49.005625 x = 9
2021-10-03 07:19:50.008775 x = 10
2021-10-03 07:19:50.008775 x = 11
2021-10-03 07:19:50.008775 x = 13
2021-10-03 07:19:50.008775 x = 12
2021-10-03 07:19:50.008775 x = 14
2021-10-03 07:19:51.012774 x = 15
2021-10-03 07:19:51.012774 x = 16
2021-10-03 07:19:51.012774 x = 17
2021-10-03 07:19:51.012774 x = 18
2021-10-03 07:19:51.012774 x = 19
Total elapsed time: 13.015560150146484
CPU Intensive Example
In the following example I am using a RateLimitedProcessPool
since my worker function is 100% CPU taking approximately 10 seconds to execute on my desktop. I only have 8 logical cores (4 physical cores), so my pool size is 8 and for this demo I am submitting 8 tasks with a rate of 3 tasks per second. The second 3 tasks will start approximately 1 second after the first 3 and the next 2 tasks will start 1 second after that. Because the number of physical cores becomes a limiting factor, the total running time is a little over 21 seconds.
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time
class RateLimitedPool:
# There is an a lag between the first call to apply_async and the first task actually starting:
LAG_TIME = .2 # seconds - needs to be fine-tuned:
def __init__(self, rate, per):
assert isinstance(rate, int) and rate > 0
assert isinstance(per, (int, float)) and per > 0
self.rate = rate
self.per = per
self.count = 0
self.start_time = None
self.first_time = True
def _check_allowed(self):
current_time = time.time()
if self.start_time is None:
self.start_time = current_time
self.count = 1
return True
elapsed_time = current_time - self.start_time
if self.first_time:
elapsed_time -= self.LAG_TIME
if elapsed_time >= self.per:
self.start_time = current_time
self.count = 1
self.first_time = False
return True
if self.count < self.rate:
self.count += 1
return True
return False
def apply_async(self, *args, **kwargs):
while not self._check_allowed():
time.sleep(.1) # This can be fine-tuned
return super().apply_async(*args, **kwargs)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
########################################
ONE_SECOND_ITERATIONS = 20_000_000
def one_second():
sum = 0
for _ in range(ONE_SECOND_ITERATIONS):
sum += 1
return sum
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 3 of these per second.
"""
from datetime import datetime
print(datetime.now(), 'x = ', x)
for _ in range(10):
one_second()
return x, x * x
def main():
args = range(8)
pool = RateLimitedProcessPool(8, rate=3, per=1) # 3 per second
start = time.time()
for x in args:
pool.apply_async(worker, args=(x,))
# Wait for all tasks to complete
pool.close()
pool.join()
print('Total elapsed time:', time.time() - start)
if __name__ == '__main__':
main()
Prints:
2021-10-03 09:51:32.857166 x = 0
2021-10-03 09:51:32.859168 x = 1
2021-10-03 09:51:32.864166 x = 2
2021-10-03 09:51:33.899890 x = 5
2021-10-03 09:51:33.899890 x = 3
2021-10-03 09:51:33.907888 x = 4
2021-10-03 09:51:34.924889 x = 6
2021-10-03 09:51:34.925888 x = 7
Total elapsed time: 21.22123622894287
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With