Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rate Limit Downloads Amongst Multiple Processes

I want to download and process a lot of files from website. The terms of service for the site restrict the number of files you're permitted to download per second.

The time that it takes to process the files is actually the bottle neck, so I'd like to be able process multiple files in parallel. But I don't want the different processes to combine to violate the download limit. So I need something that limits the over request rate. I was thinking something like the following, but I'm not exactly an expert with the multiprocessing module.

import multiprocessing
from multiprocessing.managers import BaseManager
import time

class DownloadLimiter(object):

    def __init__(self, time):
        self.time = time
        self.lock = multiprocessing.Lock()

    def get(self, url):
        self.lock.acquire()
        time.sleep(self.time)
        self.lock.release()
        return url


class DownloadManager(BaseManager):
    pass

DownloadManager.register('downloader', DownloadLimiter)


class Worker(multiprocessing.Process):

    def __init__(self, downloader, queue, file_name):
        super().__init__()
        self.downloader = downloader
        self.file_name = file_name
        self.queue = queue

    def run(self):
        while not self.queue.empty():
            url = self.queue.get()
            content = self.downloader.get(url)
            with open(self.file_name, "a+") as fh:
                fh.write(str(content) + "\n")

Then somewhere else running the downloads with

manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()

urls = range(50)
for url in urls:
    queue.put(url)

job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]

for job in jobs:
    job.start()

for job in jobs:
    job.join()

This seems to do the job on a small scale, but I'm a little wary about whether the locking is really being done correctly.

Also, if there's a better pattern for achieving the same goal, I'd love to hear it.

like image 521
Batman Avatar asked Nov 06 '22 19:11

Batman


2 Answers

This can be done cleanly with Ray, which is a library for parallel and distributed Python.

Resources in Ray

When you start Ray, you can tell it what resources are available on that machine. Ray will automatically attempt to determine the number of CPU cores, and the number of GPUs, but these can be specified and in fact arbitrary user-defined resources can be passed in as well, e.g., by calling

ray.init(num_cpus=4, resources={'Network': 2})

This tells Ray that the machine has 4 CPU cores and 2 of a user-defined resource called Network.

Each Ray "task", which is a schedulable unit of work, has certain resource requirements. By default, a task will require 1 CPU core and nothing else. However, arbitrary resource requirements can be specified by declaring the corresponding function with

@ray.remote(resources={'Network': 1})
def f():
    pass

This tells Ray that in order for f to execute on a "worker" process, there must be 1 CPU core (the default value) and 1 Network resource available.

Since the machine has 2 of the Network resource and 4 CPU cores, at most 2 copies of f can be executed concurrently. On the other hand, if there is another function g declared with

@ray.remote
def g():
    pass

then four copies of g can be executed concurrently or two copies of f and two copies of g can be executed concurrently.

Example

Here's an example, with placeholders for the actual functions used to download the content and process the content.

import ray
import time

max_concurrent_downloads = 2

ray.init(num_cpus=4, resources={'Network': max_concurrent_downloads})

@ray.remote(resources={'Network': 1})
def download_content(url):
    # Download the file.
    time.sleep(1)
    return 'result from ' + url

@ray.remote
def process_result(result):
    # Process the result.
    time.sleep(1)
    return 'processed ' + result

urls = ['url1', 'url2', 'url3', 'url4']

result_ids = [download_content.remote(url) for url in urls]

processed_ids = [process_result.remote(result_id) for result_id in result_ids]

# Wait until the tasks have finished and retrieve the results.
processed_results = ray.get(processed_ids)

Here's a timeline depiction (which you can produce by running ray timeline from the command line and opening the resulting JSON file in chrome://tracing in the Chrome web browser).

In the above script, we submit 4 download_content tasks. Those are the ones that we rate limit by specifying that they require the Network resource (in addition to the default 1 CPU resource). Then we submit 4 process_result tasks, which each require the default 1 CPU resource. The tasks are executed in three stages (just look at the blue boxes).

  1. We start by executing 2 download_content tasks, which is as many as can execute at one time (because of the rate limiting). We can't execute any of the process_result tasks yet because they depend on the output of the download_content tasks.
  2. Those finish, so we start executing the remaining two download_content tasks as well as two process_result tasks, because we are not rate limiting the process_result tasks.
  3. We execute the remaining process_result tasks.

Each "row" is one worker process. Time goes from left to right.

enter image description here

You can check out more about how to do this at the Ray documentation.

like image 72
Robert Nishihara Avatar answered Nov 15 '22 12:11

Robert Nishihara


OK, after the following clarification from the OP

By "downloads per second" I mean that globally there are no more than downloads started per second.

I decided to post another answer as I think my first one could be also of interest for somebody looking to limit a number of concurrent running processes.

I think there is no need to use additional frameworks to solve this problem. The idea is to use downloading threads which are spawn for each resource link, a resource queue, and a fixed number of processing workers which are processes, not threads:

#!/usr/bin/env python3
import os
import time
import random
from threading import Thread
from multiprocessing import Process, JoinableQueue


WORKERS = 4
DOWNLOADS_PER_SECOND = 2


def download_resource(url, resource_queue):
    pid = os.getpid()

    t = time.strftime('%H:%M:%S')
    print('Thread {p} is downloading from {u} ({t})'.format(p=pid, u=url, t=t),
          flush=True)
    time.sleep(random.randint(1, 10))

    results = '[resource {}]'.format(url)
    resource_queue.put(results)


def process_resource(resource_queue):
    pid = os.getpid()

    while True:
        res = resource_queue.get()

        print('Process {p} is processing {r}'.format(p=pid, r=res),
              flush=True)
        time.sleep(random.randint(1, 10))

        resource_queue.task_done()


def main():
    resource_queue = JoinableQueue()

    # Start process workers:
    for _ in range(WORKERS):
        worker = Process(target=process_resource,
                         args=(resource_queue,),
                         daemon=True)
        worker.start()

    urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]

    while urls:
        target_urls = urls[:DOWNLOADS_PER_SECOND]
        urls = urls[DOWNLOADS_PER_SECOND:]

        # Start downloader threads:
        for url in target_urls:
            downloader = Thread(target=download_resource,
                                args=(url, resource_queue),
                                daemon=True)
            downloader.start()

        time.sleep(1)

    resource_queue.join()


if __name__ == '__main__':
    main()

The results look something like this:

$ ./limit_download_rate.py
Thread 32482 is downloading from https://link/to/resource/0 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/1 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/2 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/3 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/4 (10:14:10)
Thread 32482 is downloading from https://link/to/resource/5 (10:14:10)
Process 32483 is processing [resource https://link/to/resource/2]
Process 32484 is processing [resource https://link/to/resource/0]
Thread 32482 is downloading from https://link/to/resource/6 (10:14:11)
Thread 32482 is downloading from https://link/to/resource/7 (10:14:11)
Process 32485 is processing [resource https://link/to/resource/1]
Process 32486 is processing [resource https://link/to/resource/3]
Thread 32482 is downloading from https://link/to/resource/8 (10:14:12)
Thread 32482 is downloading from https://link/to/resource/9 (10:14:12)
Process 32484 is processing [resource https://link/to/resource/6]
Process 32485 is processing [resource https://link/to/resource/9]
Process 32483 is processing [resource https://link/to/resource/8]
Process 32486 is processing [resource https://link/to/resource/4]
Process 32485 is processing [resource https://link/to/resource/7]
Process 32483 is processing [resource https://link/to/resource/5]

Here, every second DOWNLOADS_PER_SECOND threads are starting, two in this example, which then download and put resources into the queue. The WORKERS is a number of processes which get resources from the queue for further processing. With this setup, you'll be able to limit the number of downloads started per second and have the workers processing obtained resources in parallel.

like image 31
constt Avatar answered Nov 15 '22 11:11

constt