Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficiently download files asynchronously with requests

I want to download files as fast as possible with python.Here is my code

import pandas as pd
import requests
from requests_futures.sessions import FuturesSession
import os
import pathlib
from timeit import default_timer as timer


class AsyncDownloader:
    """Download files asynchronously"""

    __urls = set()
    __dest_path = None
    __user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:58.0) Gecko/20100101 Firefox/58.0'
    __read_timeout = 60
    __connection_timeout = 30
    __download_count = 0  # unlimited
    # http://www.browserscope.org/?category=network
    __worker_count = 17  # No of threads to spawn
    __chunk_size = 1024
    __download_time = -1
    __errors = []

    # TODO Fetch only content of a specific type from a csv
    # TODO Improve code structure so that it can be used as a commandline tool

    def set_source_csv(self, source_path, column_name):
        self.source_path = source_path
        self.column_name = column_name

        try:
            my_csv = pd.read_csv(source_path, usecols=[self.column_name], chunksize=10)
        except ValueError:
            print("The column name doesn't exist")
            return
        else:
            # No exception whatsoever
            for chunk in my_csv:
                AsyncDownloader.__urls.update(set(getattr(chunk, self.column_name)))

    def set_destination_path(self, dest_path):
        if dest_path.endswith('/'):
            dest_path = dest_path[:-1]
        self.dest_path = dest_path
        # TODO Add exception in case we can't create the directory
        pathlib.Path(self.dest_path).mkdir(parents=True, exist_ok=True)
        if os.access(self.dest_path, os.W_OK):
            AsyncDownloader.__dest_path = pathlib.Path(self.dest_path).resolve()

    def set_user_agent(self, useragent):
        self.useragent = useragent
        AsyncDownloader.__user_agent = self.useragent

    def set_connection_timeout(self, ctimeout_secs):
        self.timeout_secs = ctimeout_secs
        if self.timeout_secs >= 0:
            AsyncDownloader.__connection_timeout = self.timeout_secs

    def set_read_timeout(self, rtimeout_secs):
        self.timeout_secs = rtimeout_secs
        if self.timeout_secs >= 0:
            AsyncDownloader.__read_timeout = self.timeout_secs

    def set_download_count(self, file_count):
        self.file_count = file_count
        if self.file_count > 0:
            AsyncDownloader.__download_count = self.file_count

    def set_worker_count(self, worker_count):
        self.worker_count = worker_count
        if self.worker_count > 0:
            AsyncDownloader.__worker_count = self.worker_count

    def set_chunk_size(self, chunk_size):
        self.chunk_size = chunk_size
        if self.chunk_size > 0:
            AsyncDownloader.__chunk_size = self.chunk_size

    def print_urls(self):
        print(AsyncDownloader.__urls)

    def get_download_time(self):
        return AsyncDownloader.__download_time

    def get_errors(self):
        return AsyncDownloader.__errors

    def download(self):
        start = timer()
        try:
            session = FuturesSession(max_workers=AsyncDownloader.__worker_count)
            session.headers.update({'user-agent': AsyncDownloader.__user_agent})
            session.request(AsyncDownloader.__connection_timeout,
                            AsyncDownloader.__connection_timeout, stream=True)

            results = []
            # Give an accurate file count even if we don't have to download it as it a;ready exist
            file_count = 0

            for url in AsyncDownloader.__urls:
                filename = os.path.basename(url)
                # check if we need only a limited number of files
                if AsyncDownloader.__download_count != 0:
                    # No need to download file if it already exist
                    if pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
                        file_count += 1
                        continue
                    else:
                        if file_count < AsyncDownloader.__download_count:
                            file_count += 1
                            results.append(session.get(url))
                else:
                    if not pathlib.Path(AsyncDownloader.__dest_path / filename).is_file():
                        results.append(session.get(url))

            for result in results:
                # wait for the response to complete, if it hasn't already
                response = result.result()
                filename = os.path.basename(response.url)
                if response.status_code == 200:
                    with open(pathlib.Path(AsyncDownloader.__dest_path / filename).resolve(), 'wb') as fd:
                        for chunk in response.iter_content(chunk_size=AsyncDownloader.__chunk_size):
                            if chunk:  # filter out keep-alive new chunks
                                fd.write(chunk)

            end = timer()
            AsyncDownloader.__download_time = end - start

        except requests.exceptions.HTTPError as errh:
            AsyncDownloader.__errors.append("Http Error:" + errh)
            # print("Http Error:", errh)
        except requests.exceptions.ConnectionError as errc:
            AsyncDownloader.__errors.append("Error Connecting:" + errc)
            # print("Error Connecting:", errc)
        except requests.exceptions.Timeout as errt:
            AsyncDownloader.__errors.append("Timeout Error:" + errt)
            # print("Timeout Error:", errt)
        except requests.exceptions.RequestException as err:
            AsyncDownloader.__errors.append("OOps: Something Else" + err)
        else:
            return

The following code is making a very bad assumption.Indeed i am assuming that the first url will finish first which is of course not correct.

# wait for the response to complete, if it hasn't already
response = result.result()

How can i ensure that only requests that have been completed are processed instead of taking an assumption like above in an efficient manner ?

I would appreciate any other suggestion on how to improve performance.

Kind Regards

like image 253
user2650277 Avatar asked Feb 05 '18 17:02

user2650277


Video Answer


4 Answers

Even if the connections were completed in order, you are still processing the files sequentially. The second file must wait for the first one to be written and so on. So, the best thing you can do is processing everything in parallel (this can be done despite the GIL, since io operations like writing to disk and reading from the network will release it). Basically, use regular requests library (not requests-futures) and create a future/thread per request + file-processing.

There are still more ways to make it faster, like keep downloading chunks while writing (i.e two threads, one for request and one for file-processing). And reading chunks in parallel by making multi-part requests, which is "download accelerator" territory and you may not want that kind of complexity in your code.

Edit: Also, chunked downloads are lazy, which means you are only making the initial requests in parallel, but the actual chunked-file download is being made sequentially, since it's being done in the main thread. So, your current approach is not much better than a fully synchronous one. The above advice still stands.

like image 117
nitely Avatar answered Oct 17 '22 10:10

nitely


To work with your code, I created a .csv file that contains links to several robots.txt files from several websites in this order: GitHub, UDemy, YouTube.

After debugging, the first result in

response = result.result()

was (In this order): UDemy, YouTube, GitHub. For the record, the size of each robots.txt is increasing in the same order that I got the results. This means that there was no problem to begin with, despite me setting up a .csv file in a certain order, the results came in the order where the files were first downloaded.

I would appreciate any other suggestion on how to improve performance.

As for performance, you can improve the speed by either creating a thread for writing the response to a file, or use an async IO library, such as Tinche/aiofiles.

If you want to go even further, you could try improving the performance of the program itself by using an alternative implementation of Python, such as PyPy

like image 35
A. Smoliak Avatar answered Oct 17 '22 11:10

A. Smoliak


The simplest way to do this doesn't require any threading or special async code: just use the regular requests library and its built-in streaming option. You say response = session.get(url, stream=True) and then use response.iter_content(chunk_size=1024) (for example) to access the downloaded information one chunk at a time. Here's a functional example:

import requests
import os

def stream_multiple(urls):
    responses = {url: requests.get(url, stream=True) for url in urls)
    streams = {url: responses[url].iter_content(chunk_size=1024)
            for url in urls}
    handles = {url: open(os.path.basename(url), 'wb') for url in urls}
    while streams:
        for url in list(streams.keys()):
            try:
                chunk = next(streams[url])
                print("Received {} bytes for {}".format(len(chunk), url))
                handles[url].write(chunk)
            except StopIteration: # no more contenet
                handles[url].close()
                streams.pop(url)

Sample output:

rat@pandion:~/tmp$ python smu.py
Received 1296 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
Received 1882 bytes for http://www.gutenberg.org/ebooks/21497.txt.utf-8
Received 1524 bytes for http://www.gutenberg.org/files/1729/1729-0.txt
Received 1508 bytes for http://www.gutenberg.org/ebooks/21790.txt.utf-8
Received 1826 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
Received 2349 bytes for http://www.gutenberg.org/ebooks/21497.txt.utf-8
Received 1834 bytes for http://www.gutenberg.org/files/1729/1729-0.txt
Received 1838 bytes for http://www.gutenberg.org/ebooks/21790.txt.utf-8
Received 2009 bytes for http://www.gutenberg.org/files/9490/9490-0.txt
...

You could probably achieve slightly faster performance using threads or multiprocessing, but I doubt that would be significantly better. In virtually all cases writing your data to disk is going to be vastly faster than receiving it from the network.

like image 5
Nathan Vērzemnieks Avatar answered Oct 17 '22 12:10

Nathan Vērzemnieks


You can use gevent if you don't worry about "monkey patch"

import gevent.monkey
import requests

CONNECTIONS = 10

gevent.monkey.patch_all()  # debug in PyCharm: https://blog.jetbrains.com/pycharm/2012/08/gevent-debug-support/

import gevent.pool


def your_request_without_any_changes(url):
    return requests.get(url)


pool = gevent.pool.Pool(CONNECTIONS)
for response in pool.imap_unordered(your_request_without_any_changes, ['http://www.google.com'] * 100):
    print(response.status_code)

gevent use "event loop" and patch requests library (actually it happens at a lower level) for switching to another task when we are waiting for the response.

like image 1
ADR Avatar answered Oct 17 '22 10:10

ADR