I am connecting to a local server (OSRM) via HTTP to submit routes and get back drive-times. I notice that I/O is slower than threading because it seems that waiting period for the calculation is smaller than the time it takes to send the request and process the JSON output (I think I/O is better when the server takes some time to process your request -> you don't want it to be blocking because you have to wait, this isn't my case). Threading suffers from the Global Interpreter Lock and so it appears (and evidence below) that my fastest option is to use multiprocessing.
The issue with multiprocessing is that it is so fast that it exhausts my sockets and I get an error (requests issues a new connection each time). I can (in serial) use the requests.Sessions() object to keep a connection alive, however I can't get this working in parallel (each process has it's own session).
The closest code I have to working at the moment is this multiprocessing code:
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
However, I can't get the HTTPConnectionPool to work properly and it creates new sockets each time (I think) and then gives me the error:
HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))
My goal is to get distance calculations from an OSRM-routing server I am running locally (as quickly as possible).
I have a question in two parts - basically I am trying to convert some code using multiprocessing.Pool() to better code (proper asynchronous functions - so that the execution never breaks and it runs as fast as possible).
The issue I am having is that everything I try seems slower than multiprocessing (I present several examples below of what I have tried).
Some potential methods are: gevents, grequests, tornado, requests-futures, asyncio, etc.
A - Multiprocessing.Pool()
I initially started with something like this:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....
pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)
Where I was connecting to a local server (localhost,port:5005) which was launched on 8 threads and supports parallel execution.
After a bit of search I realised the error I was getting was because requests was opening a new connection/socket for each-request. So this was actually too fast and exhausting sockets after a while. It seems to the way to address this is to use a requests.Session() - however I haven't been able to get this working with multiprocessing (where each process has it's own session).
Question 1.
On some of the computers this runs fine, e.g.:
To compare against later: 45% server usage and 1700 requests per second
However, on some it does not and I don't fully understand why:
HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))
My guess would be that, since requests locks the socket when it is in use - sometimes the server is too slow to respond to the old request and a new one is generated. The server supports queueing, however requests does not so instead of adding to the queue I get the error?
Question 2.
I found:
Blocking Or Non-Blocking?
With the default Transport Adapter in place, Requests does not provide any kind of non-blocking IO. The Response.content property will block until the entire response has been downloaded. If you require more granularity, the streaming features of the library (see Streaming Requests) allow you to retrieve smaller quantities of the response at a time. However, these calls will still block.
If you are concerned about the use of blocking IO, there are lots of projects out there that combine Requests with one of Python’s asynchronicity frameworks.
Two excellent examples are grequests and requests-futures.
B - requests-futures
To address this I needed to rewrite my code to use asynchronous requests so I tried the below using:
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
(By the way I start my server with the option to use all threads)
And the main code:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
calc_routes.append(row)
Where my function (ReqOsrm) is now rewritten as:
def ReqOsrm(sess, resp):
json_geocode = resp.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
# Cannot find route between points (code errors as 999)
else:
out = [999, 0, 0, 0, 0, 0, 0]
resp.data = out
However, this code is slower than the multiprocessing one! Before I was getting around 1700 requests a second, now I am getting 600 second. I guess this is because I don't have full CPU utilisation, however I'm not sure how to increase it?
C - Thread
I tried another method (creating threads) - however again wasn't sure how to get this to maximise CPU usage (ideally I want to see my server using 50%, no?):
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = resp.json()
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done but no route")
out = [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("Error: %s" % err)
out = [qid, 999, 0, 0, 0, 0, 0, 0]
qres.put(out)
return
#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in url_routes:
q.put(url)
q.join()
except Exception:
pass
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
This method is faster than requests_futures I think but I don't know how many threads to set to maximise this -
D - tornado (not working)
I am now trying tornado - however can't quite get it working it breaks with exist code -1073741819 if I use curl - if I use simple_httpclient it works but then I get timeout errors:
ERROR:tornado.application:Multiple exceptions in yield list Traceback (most recent call last): File "C:\Anaconda3\lib\site-packages\tornado\gen.py", line 789, in callback result_list.append(f.result()) File "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", line 232, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Timeout
def handle_req(r):
try:
json_geocode = json_decode(r)
status = int(json_geocode['status'])
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
print(out)
except Exception as err:
print(err)
out = [999, 0, 0, 0, 0, 0, 0]
return out
# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)
@gen.coroutine
def run_experiment(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url, qid in urls]
responses_out = [handle_req(r.body) for r in responses]
raise gen.Return(value=responses_out)
# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)
E - asyncio / aiohttp
Decided to try another approach (although would be great to get tornado working) using asyncio and aiohttp.
import asyncio
import aiohttp
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done, but not route for {0} - status: {1}".format(qid, status))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
return out
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
response = yield from aiohttp.request('GET', url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(500)
# http_client returns futures
# save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
This works OK, however still slower than multiprocessing!
A process is a program under execution i.e an active program. A thread is a lightweight process that can be managed independently by a scheduler. Processes require more time for context switching as they are more heavy. Threads require less time for context switching as they are lighter than processes.
Difference between Threading and Multiprocessing :~Starting a thread is faster than starting a process. Memory is shared between all threads. Mutexes often necessary to control access to shared data. One GIL (Global Interpreter Lock) for all threads (Important)
Multiprocessing is a easier to just drop in than threading but has a higher memory overhead. If your code is CPU bound, multiprocessing is most likely going to be the better choice—especially if the target machine has multiple cores or CPUs.
Python doesn't support multi-threading because Python on the Cpython interpreter does not support true multi-core execution via multithreading. However, Python does have a threading library. The GIL does not prevent threading.
Thanks everyone for the help. I wanted to post my conclusions:
Since my HTTP requests are to a localserver which processes the request instantly it does not make much sense for me to use async approaches (compared to most cases when requests are sent over the internet). The costly factor for me is actually sending the request and processing the feedback, which means I get much better speeds using multiple processes (threads suffer from GIL). I should also use sessions to increase the speed (no need to close and re-open a connection to the SAME server) and help prevent port-exhaustion.
Here are all the methods tried (working) with example RPS:
Serial
S1. Serial GET request (no session) -> 215 RPS
def ReqOsrm(data):
url, qid = data
try:
response = requests.get(url)
json_geocode = json.loads(response.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
S2. Serial GET request (requests.Session()) -> 335 RPS
session = requests.Session()
def ReqOsrm(data):
url, qid = data
try:
response = session.get(url)
json_geocode = json.loads(response.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
S3. Serial GET request (urllib3.HTTPConnectionPool) -> 545 RPS
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1)
def ReqOsrm(data):
url, qid = data
try:
response = conn_pool.request('GET', url)
json_geocode = json.loads(response.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
return [qid, 999, 0, 0]
# Run:
calc_routes = [ReqOsrm(x) for x in url_routes]
Async IO
A4. AsyncIO with aiohttp -> 450 RPS
import asyncio
import aiohttp
concurrent = 100
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
with aiohttp.ClientSession() as session:
response = yield from session.get(url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(num_chunks=concurrent)
# http_client returns futures, save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
A5. Threading without sessions -> 330 RPS
from threading import Thread
from queue import Queue
concurrent = 100
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = json.loads(resp.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
out = [qid, 999, 0, 0]
qres.put(out)
return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
for url in url_routes:
q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
A6. Threading with HTTPConnectionPool -> 1550 RPS
from threading import Thread
from queue import Queue
from urllib3 import HTTPConnectionPool
concurrent = 100
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = conn_pool.request('GET', url)
return resp.status, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = json.loads(resp.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
out = [qid, 999, 0, 0]
qres.put(out)
return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
for url in url_routes:
q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
A7. requests-futures -> 520 RPS
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
concurrent = 100
def ReqOsrm(sess, resp):
try:
json_geocode = resp.json()
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
out = [200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err)
out = [999, 0, 0]
resp.data = out
#Run:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0]
calc_routes.append(row)
Multiple Processes
P8. multiprocessing.worker + queue + requests.session() -> 1058 RPS
from multiprocessing import *
class Worker(Process):
def __init__(self, qin, qout, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
self.qin = qin
self.qout = qout
def run(self):
s = requests.session()
while not self.qin.empty():
url, qid = self.qin.get()
data = s.get(url)
self.qout.put(ReqOsrm(data, qid))
self.qin.task_done()
def ReqOsrm(resp, qid):
try:
json_geocode = json.loads(resp.content.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid)
return [qid, 999, 0, 0]
# Run:
qout = Queue()
qin = JoinableQueue()
[qin.put(url_q) for url_q in url_routes]
[Worker(qin, qout).start() for _ in range(cpu_count())]
qin.join()
calc_routes = []
while not qout.empty():
calc_routes.append(qout.get())
P9. multiprocessing.worker + queue + HTTPConnectionPool() -> 1230 RPS
P10. multiprocessing v2 (not really sure how this is different) -> 1350 RPS
conn_pool = None
def makePool(host, port):
global conn_pool
pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(data):
url, qid = data
try:
response = conn_pool.request('GET', url)
json_geocode = json.loads(response.data.decode('utf-8'))
tot_time_s = json_geocode['paths'][0]['time']
tot_dist_m = json_geocode['paths'][0]['distance']
return [qid, 200, tot_time_s, tot_dist_m]
except Exception as err:
print("Error: ", err, qid, url)
return [qid, 999, 0, 0]
# Run:
pool = Pool(initializer=makePool, initargs=(ghost, gport))
calc_routes = pool.map(ReqOsrm, url_routes)
So in conclusion it seems that the best methods for me are #10 (and surprisingly #6)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With