I want to download files as fast as possible with python.Here is my code
import pandas as pd
import requests
from requests_futures.sessions import FuturesSession
import os
import pathlib
from timeit import default_timer as timer
class AsyncDownloader:
"""Download files asynchronously"""
__urls = set()
__dest_path = None
__user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:58.0) Gecko/20100101 Firefox/58.0'
__read_timeout = 60
__connection_timeout = 30
__download_count = 0 # unlimited
# http://www.browserscope.org/?category=network
__worker_count = 17 # No of threads to spawn
__chunk_size = 1024
__download_time = -1
__errors = []
# TODO Fetch only content of a specific type from a csv
# TODO Improve code structure so that it can be used as a commandline tool
def set_source_csv(self, source_path, column_name):
self.source_path = source_path
self.column_name = column_name
try:
my_csv = pd.read_csv(source_path, usecols=[self.column_name], chunksize=10)
except ValueError:
print("The column name doesn't exist")
return
else:
# No exception whatsoever
for chunk in my_csv:
AsyncDownloader.__urls.update(set(getattr(chunk, self.column_name)))
def set_destination_path(self, dest_path):
if dest_path.endswith('/'):
dest_path = dest_path[:-1]
self.dest_path = dest_path
# TODO Add exception in case we can't create the directory
pathlib.Path(self.dest_path).mkdir(parents=True, exist_ok=True)
if os.access(self.dest_path, os.W_OK):
AsyncDownloader.__dest_path = pathlib.Path(self.dest_path).resolve()
def set_user_agent(self, useragent):
self.useragent = useragent
AsyncDownloader.__user_agent = self.useragent
def set_connection_timeout(self, ctimeout_secs):
self.timeout_secs = ctimeout_secs
if self.timeout_secs >= 0:
AsyncDownloader.__connection_timeout = self.timeout_secs
def set_read_timeout(self, rtimeout_secs):
self.timeout_secs = rtimeout_secs
if self.timeout_secs >= 0:
AsyncDownloader.__read_timeout = self.timeout_secs
def set_download_count(self, file_count):
self.file_count = file_count
if self.file_count > 0:
AsyncDownloader.__download_count = self.file_count
def set_worker_count(self, worker_count):
self.worker_count = worker_count
if self.worker_count > 0:
AsyncDownloader.__worker_count = self.worker_count
def set_chunk_size(self, chunk_size):
self.chunk_size = chunk_size
if self.chunk_size > 0:
AsyncDownloader.__chunk_size = self.chunk_size
def print_urls(self):
print(AsyncDownloader.__urls)
def get_download_time(self):
return AsyncDownloader.__download_time
def get_errors(self):
return AsyncDownloader.__errors
def download(self):
start = timer()
try:
session = FuturesSession(max_workers=AsyncDownloader.__worker_count)
session.headers.update({'user-agent': AsyncDownloader.__user_agent})
session.request(AsyncDownloader.__connection_timeout,
AsyncDownloader.__connection_timeout, stream=True)
results = []
# Give an accurate file count even if we don't have to download it as it a;ready exist
file_count = 0
for url in AsyncDownloader.__urls:
filename = os.path.basename(url)
# check if we need only a limited number of files
if AsyncDownloader.__download_count != 0:
# No need to download file if it already exist
if pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
file_count += 1
continue
else:
if file_count < AsyncDownloader.__download_count:
file_count += 1
results.append(session.get(url))
else:
if not pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
results.append(session.get(url))
for result in results:
# wait for the response to complete, if it hasn't already
response = result.result()
filename = os.path.basename(response.url)
if response.status_code == 200:
with open(pathlib.Path(AsyncDownloader.__dest_path / filename).resolve(), 'wb') as fd:
for chunk in response.iter_content(chunk_size=AsyncDownloader.__chunk_size):
if chunk: # filter out keep-alive new chunks
fd.write(chunk)
end = timer()
AsyncDownloader.__download_time = end - start
except requests.exceptions.HTTPError as errh:
AsyncDownloader.__errors.append("Http Error:" + errh)
# print("Http Error:", errh)
except requests.exceptions.ConnectionError as errc:
AsyncDownloader.__errors.append("Error Connecting:" + errc)
# print("Error Connecting:", errc)
except requests.exceptions.Timeout as errt:
AsyncDownloader.__errors.append("Timeout Error:" + errt)
# print("Timeout Error:", errt)
except requests.exceptions.RequestException as err:
AsyncDownloader.__errors.append("OOps: Something Else" + err)
else:
return
The following code is making a very bad assumption.Indeed i am assuming that the first url will finish first which is of course not correct.
# wait for the response to complete, if it hasn't already
response = result.result()
How can i ensure that only requests that have been completed are processed instead of taking an assumption like above in an efficient manner ?
I would appreciate any other suggestion on how to improve performance.
Kind Regards
Even if the connections were completed in order, you are still processing the files sequentially. The second file must wait for the first one to be written and so on. So, the best thing you can do is processing everything in parallel (this can be done despite the GIL, since io operations like writing to disk and reading from the network will release it). Basically, use regular requests
library (not requests-futures
) and create a future/thread per request + file-processing.
There are still more ways to make it faster, like keep downloading chunks while writing (i.e two threads, one for request and one for file-processing). And reading chunks in parallel by making multi-part
requests, which is "download accelerator" territory and you may not want that kind of complexity in your code.
Edit: Also, chunked downloads are lazy, which means you are only making the initial requests in parallel, but the actual chunked-file download is being made sequentially, since it's being done in the main thread. So, your current approach is not much better than a fully synchronous one. The above advice still stands.
To work with your code, I created a .csv
file that contains links to several robots.txt
files from several websites in this order: GitHub, UDemy, YouTube.
After debugging, the first result in
response = result.result()
was (In this order): UDemy, YouTube, GitHub.
For the record, the size of each robots.txt
is increasing in the same order that I got the results.
This means that there was no problem to begin with, despite me setting up a .csv
file in a certain order, the results came in the order where the files were first downloaded.
I would appreciate any other suggestion on how to improve performance.
As for performance, you can improve the speed by either creating a thread for writing the response to a file, or use an async IO library, such as Tinche/aiofiles.
If you want to go even further, you could try improving the performance of the program itself by using an alternative implementation of Python, such as PyPy
The simplest way to do this doesn't require any threading or special async code: just use the regular requests
library and its built-in streaming option. You say response = session.get(url, stream=True)
and then use response.iter_content(chunk_size=1024)
(for example) to access the downloaded information one chunk at a time. Here's a functional example:
import requests
import os
def stream_multiple(urls):
responses = {url: requests.get(url, stream=True) for url in urls)
streams = {url: responses[url].iter_content(chunk_size=1024)
for url in urls}
handles = {url: open(os.path.basename(url), 'wb') for url in urls}
while streams:
for url in list(streams.keys()):
try:
chunk = next(streams[url])
print("Received {} bytes for {}".format(len(chunk), url))
handles[url].write(chunk)
except StopIteration: # no more contenet
handles[url].close()
streams.pop(url)
Sample output:
rat@pandion:~/tmp$ python smu.py
Received 1296 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
Received 1882 bytes for http://www.gutenberg.org/ebooks/21497.txt.utf-8
Received 1524 bytes for http://www.gutenberg.org/files/1729/1729-0.txt
Received 1508 bytes for http://www.gutenberg.org/ebooks/21790.txt.utf-8
Received 1826 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
Received 2349 bytes for http://www.gutenberg.org/ebooks/21497.txt.utf-8
Received 1834 bytes for http://www.gutenberg.org/files/1729/1729-0.txt
Received 1838 bytes for http://www.gutenberg.org/ebooks/21790.txt.utf-8
Received 2009 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
...
You could probably achieve slightly faster performance using threads or multiprocessing, but I doubt that would be significantly better. In virtually all cases writing your data to disk is going to be vastly faster than receiving it from the network.
You can use gevent
if you don't worry about "monkey patch"
import gevent.monkey
import requests
CONNECTIONS = 10
gevent.monkey.patch_all() # debug in PyCharm: https://blog.jetbrains.com/pycharm/2012/08/gevent-debug-support/
import gevent.pool
def your_request_without_any_changes(url):
return requests.get(url)
pool = gevent.pool.Pool(CONNECTIONS)
for response in pool.imap_unordered(your_request_without_any_changes, ['http://www.google.com'] * 100):
print(response.status_code)
gevent
use "event loop" and patch requests library (actually it happens at a lower level) for switching to another task when we are waiting for the response.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With