Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rate limit api multi process

I have a pretty straight forward code where I load a list if id's from a file and then iterate through each id in the list and call an api where i pass the id value and dump the api response content into a file.

I would like to speed this process up by doing parallel api calls, however the api server only allows 5 calls max per second. another key consideration is the api pull is slow, on average each call takes 10 seconds to finish.

I would like to be able to have multiple parallel process which have some way of ensuring that no more than 5 calls max occur in a single second.

This is the current code:

import pandas as pd
import numpy as np
from joblib import Parallel, delayed

ids = pd.read_csv('data.csv')

ids = ids['Id'].values.tolist()

def dump_data(df,idx):

    filename = base_dir+'\\'+str(idx)+'.csv'
    data.to_csv(filename, header= True, index=False) #write data to file

def get_api(idx):

   data = call_some_api(idx)  #api returns data as pandas dataframe, take about 10 secs
   dump_data(df,idx)


Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(idx) for idx in ids) 

I'm currently using joblib, but if there is a better library for this solution it can be used instead.

How can I ensure there will not be more than 5 requests going out at any given second? (while at he same time doing all the requests as fast as possible)

Also I'm using Python 3.9 on Windows

like image 539
Mustard Tiger Avatar asked Nov 27 '22 13:11

Mustard Tiger


1 Answers

Update 2

After rethinking this a bit more, it makes more sense to use either a standard multithreading pool or multiprocessing pool depending on your needs and to then pass to the worker function (either indirectly as a global or explicitly as an argument) a CallingThrottle instance whose throttle method can be called directly by the worker function at the precise point in processing that throttling needs to take place (right before making the request to the website). Passing the throttle instance directly as an argument to your worker function should allow you to use this with joblib (but I would think in your case all you need is a multithreading pool).

For example:

from multiprocessing.pool import ThreadPool, Pool
from multiprocessing.managers import BaseManager
from threading import Lock
import time

class CallingThrottle:
    def __init__(self, nb_call_times_limit, expired_time):
        self.nb_call_times_limit = nb_call_times_limit
        self.expired_time = expired_time
        self.called_timestamps = list()
        self.lock = Lock()

    def throttle(self):
        with self.lock:
            while len(self.called_timestamps) == self.nb_call_times_limit:
                now = time.time()
                self.called_timestamps = list(filter(
                    lambda x: now - x < self.expired_time,
                    self.called_timestamps
                ))
                if len(self.called_timestamps) == self.nb_call_times_limit:
                    time_to_sleep = self.called_timestamps[0] + self.expired_time - now
                    time.sleep(time_to_sleep)
            self.called_timestamps.append(time.time())

# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
    pass

CallingThrottleManager.register('CallingThrottle', CallingThrottle)

def init_pool(throttle):
    global calling_throttle

    calling_throttle = throttle

def worker(x):
    """
    Emulate a task that takes 10 seconds to execute.
    Cannot run more than 5 of these per second.
    """
    from datetime import datetime
    calling_throttle.throttle()
    print(datetime.now(), 'x =', x)
    time.sleep(10)
    return x, x * x

def main():
    # Multithreading example:
    calling_throttle = CallingThrottle(5, 1) # 5 calls every 1 second
    pool = ThreadPool(20)
    init_pool(calling_throttle)
    start = time.time()
    results = pool.map(worker, range(20))
    print('Total elapsed time:', time.time() - start)
    pool.close()
    pool.join()

    print('\n', '-' * 30, '\n', sep='')

    # Multiprocessing example:
    with CallingThrottleManager() as manager:
        calling_throttle = manager.CallingThrottle(5, 1) # 5 calls every 1 second
        pool = Pool(20, initializer=init_pool, initargs=(calling_throttle,))
        start = time.time()
        results = pool.map(worker, range(20))
        print('Total elapsed time:', time.time() - start)
        pool.close()
        pool.join()

if __name__ == '__main__':
    main()

To use a throttle with joblib:

import pandas as pd
import numpy as np
from joblib import Parallel, delayed
from multiprocessing.managers import BaseManager

from threading import Lock
import time

class CallingThrottle:
    def __init__(self, nb_call_times_limit, expired_time):
        self.nb_call_times_limit = nb_call_times_limit
        self.expired_time = expired_time
        self.called_timestamps = list()
        self.lock = Lock()

    def throttle(self):
        with self.lock:
            while len(self.called_timestamps) == self.nb_call_times_limit:
                now = time.time()
                self.called_timestamps = list(filter(
                    lambda x: now - x < self.expired_time,
                    self.called_timestamps
                ))
                if len(self.called_timestamps) == self.nb_call_times_limit:
                    time_to_sleep = self.called_timestamps[0] + self.expired_time - now
                    time.sleep(time_to_sleep)
            self.called_timestamps.append(time.time())

# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
    pass

def dump_data(df, idx):
    filename = base_dir+'\\'+str(idx)+'.csv'
    data.to_csv(filename, header= True, index=False) #write data to file

def get_api(calling_throttle, idx):
    calling_throttle.throttle()
    data = call_some_api(idx)  #api returns data as pandas dataframe, take about 10 secs
    dump_data(df, idx)


def main():
    ids = pd.read_csv('data.csv')
    ids = ids['Id'].values.tolist()

    CallingThrottleManager.register('CallingThrottle', CallingThrottle)

    with CallingThrottleManager() as manager:
        calling_throttle = manager.CallingThrottle()
        Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(calling_throttle, idx) for idx in ids)

if __name__ == '__main__':
    main()

Update 1

I originally implemented the rate-limiting algorithm that was referenced in the comment made by @balmy and it was noticed that there are times where the rate can be exceeded. This phenomenon was commented upon by @mindvirus where the OP was trying for 5 messages in an 8 second period:

This is good, but can exceed the rate. Let's say at time 0 you forward 5 messages, then at time N * (8/5) for N = 1, 2, ... you can send another message, resulting in more than 5 messages in an 8 second period.

So I am now using a new rate-limiting algorithm.


I have created two classes, RateLimitedProcessPool and RateLimitedThreadPool for mulitprocessing and multithreading respectively based on the algorithm presented in What's a good rate limiting algorithm?. These classes are like the standard mulitprocessing.pool.Pool and multiprocessing.pool.ThreadPool classes except the __init__ methods take two extra keyword arguments rate and per that together specify the maximum rate per second that the apply_async method can be called. For example, values rate=7 and per=3 implies that successive calls to apply_async will throttle so as to only allow a maximum rate of 7 calls every 3 seconds.

The following code demonstrates this with a simple worker function that emulates the OP's situation where the worker function takes 10 seconds to execute and must be limited to a maximum rate of 5 calls per second. We need to invoke this function 20 times and so the best performance we can achieve is a total run time of approximately 13 seconds.

import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time

class CallingThrottle:
    def __init__(self, nb_call_times_limit, expired_time):
        self.nb_call_times_limit = nb_call_times_limit
        self.expired_time = expired_time
        self.called_timestamps = list()

    def __enter__(self):
        while len(self.called_timestamps) == self.nb_call_times_limit:
            now = time.time()
            self.called_timestamps = list(filter(
                lambda x: now - x < self.expired_time,
                self.called_timestamps
            ))
            if len(self.called_timestamps) == self.nb_call_times_limit:
                time_to_sleep = self.called_timestamps[0] + self.expired_time - now
                time.sleep(time_to_sleep)
        self.called_timestamps.append(time.time())

    def __exit__(self, *exc):
        pass

class RateLimitedPool:
    def __init__(self, rate, per):
        self.calling_throttle = CallingThrottle(rate, per)
        self.first_time = True

    def apply_async(self, *args, **kwargs):
        # There could be a lag between the first call to apply_async
        # and the first task actually starting, so set the first time
        # after the call to apply_async:
        if self.first_time:
            self.first_time = False
            async_result = super().apply_async(*args, **kwargs)
            with self.calling_throttle:
                pass
            return async_result
        else:
            with self.calling_throttle:
                return super().apply_async(*args, **kwargs)

class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        RateLimitedPool.__init__(self, rate, per)

class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        RateLimitedPool.__init__(self, rate, per)

class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        RateLimitedPool.__init__(self, rate, per)

def threadpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

def processpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

########################################


def worker(x):
    """
    Emulate a task that takes 10 seconds to execute.
    Cannot run more than 5 of these per second.
    """
    from datetime import datetime
    print(datetime.now(), 'x =', x)
    time.sleep(10)
    return x, x * x

def main():
    args = range(20)
    pool = RateLimitedThreadPool(20, rate=5, per=1) # 5 per second
    start = time.time()
    for x in args:
        pool.apply_async(worker, args=(x,))
    # Wait for all tasks to complete
    pool.close()
    pool.join()
    print('Total elapsed time:', time.time() - start)

if __name__ == '__main__':
    main()

Prints:

2021-10-03 07:19:48.002628 x =  0
2021-10-03 07:19:48.002628 x =  1
2021-10-03 07:19:48.002628 x =  3
2021-10-03 07:19:48.002628 x =  4
2021-10-03 07:19:48.002628 x =  2
2021-10-03 07:19:49.005625 x =  5
2021-10-03 07:19:49.005625 x =  6
2021-10-03 07:19:49.005625 x =  8
2021-10-03 07:19:49.005625 x =  7
2021-10-03 07:19:49.005625 x =  9
2021-10-03 07:19:50.008775 x =  10
2021-10-03 07:19:50.008775 x =  11
2021-10-03 07:19:50.008775 x =  13
2021-10-03 07:19:50.008775 x =  12
2021-10-03 07:19:50.008775 x =  14
2021-10-03 07:19:51.012774 x =  15
2021-10-03 07:19:51.012774 x =  16
2021-10-03 07:19:51.012774 x =  17
2021-10-03 07:19:51.012774 x =  18
2021-10-03 07:19:51.012774 x =  19
Total elapsed time: 13.015560150146484

CPU Intensive Example

In the following example I am using a RateLimitedProcessPool since my worker function is 100% CPU taking approximately 10 seconds to execute on my desktop. I only have 8 logical cores (4 physical cores), so my pool size is 8 and for this demo I am submitting 8 tasks with a rate of 3 tasks per second. The second 3 tasks will start approximately 1 second after the first 3 and the next 2 tasks will start 1 second after that. Because the number of physical cores becomes a limiting factor, the total running time is a little over 21 seconds.

import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time

class RateLimitedPool:
    # There is an a lag between the first call to apply_async and the first task actually starting:
    LAG_TIME = .2 # seconds - needs to be fine-tuned:

    def __init__(self, rate, per):
        assert isinstance(rate, int) and rate > 0
        assert isinstance(per, (int, float)) and per > 0
        self.rate = rate
        self.per = per
        self.count = 0
        self.start_time = None
        self.first_time = True

    def _check_allowed(self):
        current_time = time.time()
        if self.start_time is None:
            self.start_time = current_time
            self.count = 1
            return True
        elapsed_time = current_time - self.start_time
        if self.first_time:
            elapsed_time -= self.LAG_TIME
        if elapsed_time >= self.per:
            self.start_time = current_time
            self.count = 1
            self.first_time = False
            return True
        if self.count < self.rate:
            self.count += 1
            return True
        return False

    def apply_async(self, *args, **kwargs):
        while not self._check_allowed():
            time.sleep(.1) # This can be fine-tuned
        return super().apply_async(*args, **kwargs)

class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        RateLimitedPool.__init__(self, rate, per)

class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        RateLimitedPool.__init__(self, rate, per)

def threadpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

def processpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

########################################

ONE_SECOND_ITERATIONS = 20_000_000

def one_second():
    sum = 0
    for _ in range(ONE_SECOND_ITERATIONS):
        sum += 1
    return sum

def worker(x):
    """
    Emulate a task that takes 10 seconds to execute.
    Cannot run more than 3 of these per second.
    """
    from datetime import datetime
    print(datetime.now(), 'x = ', x)
    for _ in range(10):
        one_second()
    return x, x * x

def main():
    args = range(8)
    pool = RateLimitedProcessPool(8, rate=3, per=1) # 3 per second
    start = time.time()
    for x in args:
        pool.apply_async(worker, args=(x,))
    # Wait for all tasks to complete
    pool.close()
    pool.join()
    print('Total elapsed time:', time.time() - start)

if __name__ == '__main__':
    main()

Prints:

2021-10-03 09:51:32.857166 x =  0
2021-10-03 09:51:32.859168 x =  1
2021-10-03 09:51:32.864166 x =  2
2021-10-03 09:51:33.899890 x =  5
2021-10-03 09:51:33.899890 x =  3
2021-10-03 09:51:33.907888 x =  4
2021-10-03 09:51:34.924889 x =  6
2021-10-03 09:51:34.925888 x =  7
Total elapsed time: 21.22123622894287
like image 183
Booboo Avatar answered Nov 29 '22 03:11

Booboo