I am using the I/O non-blocking python server Tornado. I have a class of GET
requests which may take a significant amount of time to complete (think in the range of 5-10 seconds). The problem is that Tornado blocks on these requests so that subsequent fast requests are held up until the slow request completes.
I looked at: https://github.com/facebook/tornado/wiki/Threading-and-concurrency and came to the conclusion that I wanted some combination of #3 (other processes) and #4 (other threads). #4 on its own had issues and I was unable to get reliable control back to the ioloop when there was another thread doing the "heavy_lifting". (I assume that this was due to the GIL and the fact that the heavy_lifting task has high CPU load and keeps pulling control away from the main ioloop, but thats a guess).
So I have been prototyping how to solve this by doing "heavy lifting" tasks within these slow GET
requests in a separate process and then place a callback back into the Tornado ioloop when the process is done to finish the request. This frees up the ioloop to handle other requests.
I have created a simple example demonstrating a possible solution, but am curious to get feedback from the community on it.
My question is two-fold: How can this current approach be simplified? What pitfalls potentially exist with it?
Utilize Tornado's builtin asynchronous
decorator which allows a request to stay open and for the ioloop to continue.
Spawn a separate process for "heavy lifting" tasks using python's multiprocessing
module. I first attempted to use the threading
module but was unable to get any reliable relinquishing of control back to the ioloop. It also appears that mutliprocessing
would also take advantage of multicores.
Start a 'watcher' thread in the main ioloop process using the threading
module who's job it is to watch a multiprocessing.Queue
for the results of the "heavy lifting" task when it completes. This was needed because I needed a way to know that the heavy_lifting task had completed while being able to still notify the ioloop that this request was now finished.
Be sure that the 'watcher' thread relinquishes control to the main ioloop loop often with time.sleep(0)
calls so that other requests continue to get readily processed.
When there is a result in the queue then add a callback from the "watcher" thread using tornado.ioloop.IOLoop.instance().add_callback()
which is documented to be the only safe way to call ioloop instances from other threads.
Be sure to then call finish()
in the callback to complete the request and hand over a reply.
Below is some sample code showing this approach. multi_tornado.py
is the server implementing the above outline and call_multi.py
is a sample script that calls the server in two different ways to test the server. Both tests call the server with 3 slow GET
requests followed by 20 fast GET
requests. The results are shown for both running with and without the threading turned on.
In the case of running it with "no threading" the 3 slow requests block (each taking a little over a second to complete). A few of the 20 fast requests squeeze through in between some of the slow requests within the ioloop (not totally sure how that occurs - but could be an artifact that I am running both the server and client test script on the same machine). The point here being that all of the fast requests are held up to varying degrees.
In the case of running it with threading enabled the 20 fast requests all complete first immediately and the three slow requests complete at about the same time afterwards as they have each been running in parallel. This is the desired behavior. The three slow requests take 2.5 seconds to complete in parallel - whereas in the non threaded case the three slow requests take about 3.5 seconds in total. So there is about 35% speed up overall (I assume due to multicore sharing). But more importantly - the fast requests were immediately handled in leu of the slow ones.
I do not have a lot experience with multithreaded programming - so while this seemingly works here I am curious to learn:
Is there a simpler way to accomplish this? What monster's may lurk within this approach?
(Note: A future tradeoff may be to just run more instances of Tornado with a reverse proxy like nginx doing load balancing. No matter what I will be running multiple instances with a load balancer - but I am concerned about just throwing hardware at this problem since it seems that the hardware is so directly coupled to the problem in terms of the blocking.)
multi_tornado.py
(sample server):
import time import threading import multiprocessing import math from tornado.web import RequestHandler, Application, asynchronous from tornado.ioloop import IOLoop # run in some other process - put result in q def heavy_lifting(q): t0 = time.time() for k in range(2000): math.factorial(k) t = time.time() q.put(t - t0) # report time to compute in queue class FastHandler(RequestHandler): def get(self): res = 'fast result ' + self.get_argument('id') print res self.write(res) self.flush() class MultiThreadedHandler(RequestHandler): # Note: This handler can be called with threaded = True or False def initialize(self, threaded=True): self._threaded = threaded self._q = multiprocessing.Queue() def start_process(self, worker, callback): # method to start process and watcher thread self._callback = callback if self._threaded: # launch process multiprocessing.Process(target=worker, args=(self._q,)).start() # start watching for process to finish threading.Thread(target=self._watcher).start() else: # threaded = False just call directly and block worker(self._q) self._watcher() def _watcher(self): # watches the queue for process result while self._q.empty(): time.sleep(0) # relinquish control if not ready # put callback back into the ioloop so we can finish request response = self._q.get(False) IOLoop.instance().add_callback(lambda: self._callback(response)) class SlowHandler(MultiThreadedHandler): @asynchronous def get(self): # start a thread to watch for self.start_process(heavy_lifting, self._on_response) def _on_response(self, delta): _id = self.get_argument('id') res = 'slow result {} <--- {:0.3f} s'.format(_id, delta) print res self.write(res) self.flush() self.finish() # be sure to finish request application = Application([ (r"/fast", FastHandler), (r"/slow", SlowHandler, dict(threaded=False)), (r"/slow_threaded", SlowHandler, dict(threaded=True)), ]) if __name__ == "__main__": application.listen(8888) IOLoop.instance().start()
call_multi.py
(client tester):
import sys from tornado.ioloop import IOLoop from tornado import httpclient def run(slow): def show_response(res): print res.body # make 3 "slow" requests on server requests = [] for k in xrange(3): uri = 'http://localhost:8888/{}?id={}' requests.append(uri.format(slow, str(k + 1))) # followed by 20 "fast" requests for k in xrange(20): uri = 'http://localhost:8888/fast?id={}' requests.append(uri.format(k + 1)) # show results as they return http_client = httpclient.AsyncHTTPClient() print 'Scheduling Get Requests:' print '------------------------' for req in requests: print req http_client.fetch(req, show_response) # execute requests on server print '\nStart sending requests....' IOLoop.instance().start() if __name__ == '__main__': scenario = sys.argv[1] if scenario == 'slow' or scenario == 'slow_threaded': run(scenario)
By running python call_multi.py slow
(the blocking behavior):
Scheduling Get Requests: ------------------------ http://localhost:8888/slow?id=1 http://localhost:8888/slow?id=2 http://localhost:8888/slow?id=3 http://localhost:8888/fast?id=1 http://localhost:8888/fast?id=2 http://localhost:8888/fast?id=3 http://localhost:8888/fast?id=4 http://localhost:8888/fast?id=5 http://localhost:8888/fast?id=6 http://localhost:8888/fast?id=7 http://localhost:8888/fast?id=8 http://localhost:8888/fast?id=9 http://localhost:8888/fast?id=10 http://localhost:8888/fast?id=11 http://localhost:8888/fast?id=12 http://localhost:8888/fast?id=13 http://localhost:8888/fast?id=14 http://localhost:8888/fast?id=15 http://localhost:8888/fast?id=16 http://localhost:8888/fast?id=17 http://localhost:8888/fast?id=18 http://localhost:8888/fast?id=19 http://localhost:8888/fast?id=20 Start sending requests.... slow result 1 <--- 1.338 s fast result 1 fast result 2 fast result 3 fast result 4 fast result 5 fast result 6 fast result 7 slow result 2 <--- 1.169 s slow result 3 <--- 1.130 s fast result 8 fast result 9 fast result 10 fast result 11 fast result 13 fast result 12 fast result 14 fast result 15 fast result 16 fast result 18 fast result 17 fast result 19 fast result 20
By running python call_multi.py slow_threaded
(the desired behavior):
Scheduling Get Requests: ------------------------ http://localhost:8888/slow_threaded?id=1 http://localhost:8888/slow_threaded?id=2 http://localhost:8888/slow_threaded?id=3 http://localhost:8888/fast?id=1 http://localhost:8888/fast?id=2 http://localhost:8888/fast?id=3 http://localhost:8888/fast?id=4 http://localhost:8888/fast?id=5 http://localhost:8888/fast?id=6 http://localhost:8888/fast?id=7 http://localhost:8888/fast?id=8 http://localhost:8888/fast?id=9 http://localhost:8888/fast?id=10 http://localhost:8888/fast?id=11 http://localhost:8888/fast?id=12 http://localhost:8888/fast?id=13 http://localhost:8888/fast?id=14 http://localhost:8888/fast?id=15 http://localhost:8888/fast?id=16 http://localhost:8888/fast?id=17 http://localhost:8888/fast?id=18 http://localhost:8888/fast?id=19 http://localhost:8888/fast?id=20 Start sending requests.... fast result 1 fast result 2 fast result 3 fast result 4 fast result 5 fast result 6 fast result 7 fast result 8 fast result 9 fast result 10 fast result 11 fast result 12 fast result 13 fast result 14 fast result 15 fast result 19 fast result 20 fast result 17 fast result 16 fast result 18 slow result 2 <--- 2.485 s slow result 3 <--- 2.491 s slow result 1 <--- 2.517 s
Modern web servers like Flask, Django, and Tornado are all able to handle multiple requests simultaneously. The concept of multitasking is actually very vague due to its various interpretations. You can perform multitasking using multiprocessing, multithreading, or asyncio.
Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed.
If using Ubuntu, open a terminal and type python . Then import tornado and the write command tornado. version . You will get output like '4.2.
If you're willing to use concurrent.futures.ProcessPoolExecutor
instead of multiprocessing
, this is actually very simple. Tornado's ioloop already supports concurrent.futures.Future
, so they'll play nicely together out of the box. concurrent.futures
is included in Python 3.2+, and has been backported to Python 2.x.
Here's an example:
import time from concurrent.futures import ProcessPoolExecutor from tornado.ioloop import IOLoop from tornado import gen def f(a, b, c, blah=None): print "got %s %s %s and %s" % (a, b, c, blah) time.sleep(5) return "hey there" @gen.coroutine def test_it(): pool = ProcessPoolExecutor(max_workers=1) fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future print("running it asynchronously") ret = yield fut print("it returned %s" % ret) pool.shutdown() IOLoop.instance().run_sync(test_it)
Output:
running it asynchronously got 1 2 3 and ok it returned hey there
ProcessPoolExecutor
has a more limited API than multiprocessing.Pool
, but if you don't need the more advanced features of multiprocessing.Pool
, it's worth using because the integration is so much simpler.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With