I want to download and process a lot of files from website. The terms of service for the site restrict the number of files you're permitted to download per second.
The time that it takes to process the files is actually the bottle neck, so I'd like to be able process multiple files in parallel. But I don't want the different processes to combine to violate the download limit. So I need something that limits the over request rate. I was thinking something like the following, but I'm not exactly an expert with the multiprocessing
module.
import multiprocessing
from multiprocessing.managers import BaseManager
import time
class DownloadLimiter(object):
def __init__(self, time):
self.time = time
self.lock = multiprocessing.Lock()
def get(self, url):
self.lock.acquire()
time.sleep(self.time)
self.lock.release()
return url
class DownloadManager(BaseManager):
pass
DownloadManager.register('downloader', DownloadLimiter)
class Worker(multiprocessing.Process):
def __init__(self, downloader, queue, file_name):
super().__init__()
self.downloader = downloader
self.file_name = file_name
self.queue = queue
def run(self):
while not self.queue.empty():
url = self.queue.get()
content = self.downloader.get(url)
with open(self.file_name, "a+") as fh:
fh.write(str(content) + "\n")
Then somewhere else running the downloads with
manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()
urls = range(50)
for url in urls:
queue.put(url)
job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]
for job in jobs:
job.start()
for job in jobs:
job.join()
This seems to do the job on a small scale, but I'm a little wary about whether the locking is really being done correctly.
Also, if there's a better pattern for achieving the same goal, I'd love to hear it.
This can be done cleanly with Ray, which is a library for parallel and distributed Python.
Resources in Ray
When you start Ray, you can tell it what resources are available on that machine. Ray will automatically attempt to determine the number of CPU cores, and the number of GPUs, but these can be specified and in fact arbitrary user-defined resources can be passed in as well, e.g., by calling
ray.init(num_cpus=4, resources={'Network': 2})
This tells Ray that the machine has 4 CPU cores and 2 of a user-defined resource called Network
.
Each Ray "task", which is a schedulable unit of work, has certain resource requirements. By default, a task will require 1 CPU core and nothing else. However, arbitrary resource requirements can be specified by declaring the corresponding function with
@ray.remote(resources={'Network': 1})
def f():
pass
This tells Ray that in order for f
to execute on a "worker" process, there must be 1 CPU core (the default value) and 1 Network
resource available.
Since the machine has 2 of the Network
resource and 4 CPU cores, at most 2 copies of f
can be executed concurrently. On the other hand, if there is another function g
declared with
@ray.remote
def g():
pass
then four copies of g
can be executed concurrently or two copies of f
and two copies of g
can be executed concurrently.
Example
Here's an example, with placeholders for the actual functions used to download the content and process the content.
import ray
import time
max_concurrent_downloads = 2
ray.init(num_cpus=4, resources={'Network': max_concurrent_downloads})
@ray.remote(resources={'Network': 1})
def download_content(url):
# Download the file.
time.sleep(1)
return 'result from ' + url
@ray.remote
def process_result(result):
# Process the result.
time.sleep(1)
return 'processed ' + result
urls = ['url1', 'url2', 'url3', 'url4']
result_ids = [download_content.remote(url) for url in urls]
processed_ids = [process_result.remote(result_id) for result_id in result_ids]
# Wait until the tasks have finished and retrieve the results.
processed_results = ray.get(processed_ids)
Here's a timeline depiction (which you can produce by running ray timeline
from the command line and opening the resulting JSON file in chrome://tracing in the Chrome web browser).
In the above script, we submit 4 download_content
tasks. Those are the ones that we rate limit by specifying that they require the Network
resource (in addition to the default 1 CPU resource). Then we submit 4 process_result
tasks, which each require the default 1 CPU resource. The tasks are executed in three stages (just look at the blue boxes).
download_content
tasks, which is as many as can execute at one time (because of the rate limiting). We can't execute any of the process_result
tasks yet because they depend on the output of the download_content
tasks.download_content
tasks as well as two process_result
tasks, because we are not rate limiting the process_result
tasks.process_result
tasks.Each "row" is one worker process. Time goes from left to right.
You can check out more about how to do this at the Ray documentation.
OK, after the following clarification from the OP
By "downloads per second" I mean that globally there are no more than downloads started per second.
I decided to post another answer as I think my first one could be also of interest for somebody looking to limit a number of concurrent running processes.
I think there is no need to use additional frameworks to solve this problem. The idea is to use downloading threads which are spawn for each resource link, a resource queue, and a fixed number of processing workers which are processes, not threads:
#!/usr/bin/env python3
import os
import time
import random
from threading import Thread
from multiprocessing import Process, JoinableQueue
WORKERS = 4
DOWNLOADS_PER_SECOND = 2
def download_resource(url, resource_queue):
pid = os.getpid()
t = time.strftime('%H:%M:%S')
print('Thread {p} is downloading from {u} ({t})'.format(p=pid, u=url, t=t),
flush=True)
time.sleep(random.randint(1, 10))
results = '[resource {}]'.format(url)
resource_queue.put(results)
def process_resource(resource_queue):
pid = os.getpid()
while True:
res = resource_queue.get()
print('Process {p} is processing {r}'.format(p=pid, r=res),
flush=True)
time.sleep(random.randint(1, 10))
resource_queue.task_done()
def main():
resource_queue = JoinableQueue()
# Start process workers:
for _ in range(WORKERS):
worker = Process(target=process_resource,
args=(resource_queue,),
daemon=True)
worker.start()
urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]
while urls:
target_urls = urls[:DOWNLOADS_PER_SECOND]
urls = urls[DOWNLOADS_PER_SECOND:]
# Start downloader threads:
for url in target_urls:
downloader = Thread(target=download_resource,
args=(url, resource_queue),
daemon=True)
downloader.start()
time.sleep(1)
resource_queue.join()
if __name__ == '__main__':
main()
The results look something like this:
$ ./limit_download_rate.py
Thread 32482 is downloading from https://link/to/resource/0 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/1 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/2 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/3 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/4 (10:14:10)
Thread 32482 is downloading from https://link/to/resource/5 (10:14:10)
Process 32483 is processing [resource https://link/to/resource/2]
Process 32484 is processing [resource https://link/to/resource/0]
Thread 32482 is downloading from https://link/to/resource/6 (10:14:11)
Thread 32482 is downloading from https://link/to/resource/7 (10:14:11)
Process 32485 is processing [resource https://link/to/resource/1]
Process 32486 is processing [resource https://link/to/resource/3]
Thread 32482 is downloading from https://link/to/resource/8 (10:14:12)
Thread 32482 is downloading from https://link/to/resource/9 (10:14:12)
Process 32484 is processing [resource https://link/to/resource/6]
Process 32485 is processing [resource https://link/to/resource/9]
Process 32483 is processing [resource https://link/to/resource/8]
Process 32486 is processing [resource https://link/to/resource/4]
Process 32485 is processing [resource https://link/to/resource/7]
Process 32483 is processing [resource https://link/to/resource/5]
Here, every second DOWNLOADS_PER_SECOND
threads are starting, two in this example, which then download and put resources into the queue. The WORKERS
is a number of processes which get resources from the queue for further processing. With this setup, you'll be able to limit the number of downloads started per second and have the workers processing obtained resources in parallel.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With