My task is to download 1M+ images from a given list of urls. What is the recommended way to do so?
After having read Greenlet Vs. Threads I looked into gevent
, but I fail to get it reliably to run. I played around with a test set of 100 urls and sometimes it finishes in 1.5s but sometimes it takes over 30s which is strange as the timeout* per request is 0.1, so it should never take more than 10s.
*see below in code
I also looked into grequests
but they seem to have issues with exception handling.
My 'requirements' are that I can
from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
import cStringIO
import gevent.hub
POOL_SIZE = 300
def download_image_wrapper(task):
return download_image(task[0], task[1])
def download_image(image_url, download_path):
raw_binary_request = requests.get(image_url, timeout=0.1).content
image = Image.open(cStringIO.StringIO(raw_binary_request))
image.save(download_path)
def download_images_gevent_spawn(list_of_image_urls, base_folder):
download_paths = ['/'.join([base_folder, url.split('/')[-1]])
for url in list_of_image_urls]
parameters = [[image_url, download_path] for image_url, download_path in
zip(list_of_image_urls, download_paths)]
tasks = [gevent.spawn(download_image_wrapper, parameter_tuple) for parameter_tuple in parameters]
for task in tasks:
try:
task.get()
except Exception:
print 'x',
continue
print '.',
test_urls = # list of 100 urls
t1 = time()
download_images_gevent_spawn(test_urls, 'download_temp')
print time() - t1
I think it will be better to stick with urllib2, by example of https://github.com/gevent/gevent/blob/master/examples/concurrent_download.py#L1
Try this code, I suppose it is what you're asking.
import gevent
from gevent import monkey
# patches stdlib (including socket and ssl modules) to cooperate with other greenlets
monkey.patch_all()
import sys
urls = sorted(chloya_files)
if sys.version_info[0] == 3:
from urllib.request import urlopen
else:
from urllib2 import urlopen
def download_file(url):
data = urlopen(url).read()
img_name = url.split('/')[-1]
with open('c:/temp/img/'+img_name, 'wb') as f:
f.write(data)
return True
from time import time
t1 = time()
tasks = [gevent.spawn(download_file, url) for url in urls]
gevent.joinall(tasks, timeout = 12.0)
print "Sucessful: %s from %s" % (sum(1 if task.value else 0 for task in tasks), len(tasks))
print time() - t1
There's a simple solution using gevent
and Requests
simple-requests
Use Requests
Session for HTTP persistent connection. Since gevent
makes Requests
asynchronous, I think there's no need for timeout
in HTTP requests.
By default, requests.Session
caches TCP connections (pool_connections
) for 10 hosts and limits 10 concurrent HTTP requests per cached TCP connections (pool_maxsize
). The default configuration should be tweaked to suit the need by explicitly creating an http adapter.
session = requests.Session()
http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
session.mount('http://', http_adapter)
Break the tasks as producer-consumer. Image downloading is producer task and Image processing is consumer task.
If the image processing library PIL
is not asynchronous, it may block producer coroutines. If so, consumer pool can be a gevent.threadpool.ThreadPool
. f.e.
from gevent.threadpool import ThreadPool
consumer = ThreadPool(POOL_SIZE)
This is an overview of how it can be done. I didn't test the code.
from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
from io import BytesIO
import os
from urlparse import urlparse
from gevent.pool import Pool
def download(url):
try:
response = session.get(url)
except Exception as e:
print(e)
else:
if response.status_code == requests.codes.ok:
file_name = urlparse(url).path.rsplit('/',1)[-1]
return (response.content,file_name)
response.raise_for_status()
def process(img):
if img is None:
return None
img, name = img
img = Image.open(BytesIO(img))
path = os.path.join(base_folder, name)
try:
img.save(path)
except Exception as e:
print(e)
else:
return True
def run(urls):
consumer.map(process, producer.imap_unordered(download, urls))
if __name__ == '__main__':
POOL_SIZE = 300
producer = Pool(POOL_SIZE)
consumer = Pool(POOL_SIZE)
session = requests.Session()
http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
session.mount('http://', http_adapter)
test_urls = # list of 100 urls
base_folder = 'download_temp'
t1 = time()
run(test_urls)
print time() - t1
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With