Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to best perform Multiprocessing within requests with the python Tornado server?

I am using the I/O non-blocking python server Tornado. I have a class of GET requests which may take a significant amount of time to complete (think in the range of 5-10 seconds). The problem is that Tornado blocks on these requests so that subsequent fast requests are held up until the slow request completes.

I looked at: https://github.com/facebook/tornado/wiki/Threading-and-concurrency and came to the conclusion that I wanted some combination of #3 (other processes) and #4 (other threads). #4 on its own had issues and I was unable to get reliable control back to the ioloop when there was another thread doing the "heavy_lifting". (I assume that this was due to the GIL and the fact that the heavy_lifting task has high CPU load and keeps pulling control away from the main ioloop, but thats a guess).

So I have been prototyping how to solve this by doing "heavy lifting" tasks within these slow GET requests in a separate process and then place a callback back into the Tornado ioloop when the process is done to finish the request. This frees up the ioloop to handle other requests.

I have created a simple example demonstrating a possible solution, but am curious to get feedback from the community on it.

My question is two-fold: How can this current approach be simplified? What pitfalls potentially exist with it?

The Approach

  1. Utilize Tornado's builtin asynchronous decorator which allows a request to stay open and for the ioloop to continue.

  2. Spawn a separate process for "heavy lifting" tasks using python's multiprocessing module. I first attempted to use the threading module but was unable to get any reliable relinquishing of control back to the ioloop. It also appears that mutliprocessing would also take advantage of multicores.

  3. Start a 'watcher' thread in the main ioloop process using the threading module who's job it is to watch a multiprocessing.Queue for the results of the "heavy lifting" task when it completes. This was needed because I needed a way to know that the heavy_lifting task had completed while being able to still notify the ioloop that this request was now finished.

  4. Be sure that the 'watcher' thread relinquishes control to the main ioloop loop often with time.sleep(0) calls so that other requests continue to get readily processed.

  5. When there is a result in the queue then add a callback from the "watcher" thread using tornado.ioloop.IOLoop.instance().add_callback() which is documented to be the only safe way to call ioloop instances from other threads.

  6. Be sure to then call finish() in the callback to complete the request and hand over a reply.

Below is some sample code showing this approach. multi_tornado.py is the server implementing the above outline and call_multi.py is a sample script that calls the server in two different ways to test the server. Both tests call the server with 3 slow GET requests followed by 20 fast GET requests. The results are shown for both running with and without the threading turned on.

In the case of running it with "no threading" the 3 slow requests block (each taking a little over a second to complete). A few of the 20 fast requests squeeze through in between some of the slow requests within the ioloop (not totally sure how that occurs - but could be an artifact that I am running both the server and client test script on the same machine). The point here being that all of the fast requests are held up to varying degrees.

In the case of running it with threading enabled the 20 fast requests all complete first immediately and the three slow requests complete at about the same time afterwards as they have each been running in parallel. This is the desired behavior. The three slow requests take 2.5 seconds to complete in parallel - whereas in the non threaded case the three slow requests take about 3.5 seconds in total. So there is about 35% speed up overall (I assume due to multicore sharing). But more importantly - the fast requests were immediately handled in leu of the slow ones.

I do not have a lot experience with multithreaded programming - so while this seemingly works here I am curious to learn:

Is there a simpler way to accomplish this? What monster's may lurk within this approach?

(Note: A future tradeoff may be to just run more instances of Tornado with a reverse proxy like nginx doing load balancing. No matter what I will be running multiple instances with a load balancer - but I am concerned about just throwing hardware at this problem since it seems that the hardware is so directly coupled to the problem in terms of the blocking.)

Sample Code

multi_tornado.py (sample server):

import time import threading import multiprocessing import math  from tornado.web import RequestHandler, Application, asynchronous from tornado.ioloop import IOLoop   # run in some other process - put result in q def heavy_lifting(q):     t0 = time.time()     for k in range(2000):         math.factorial(k)      t = time.time()     q.put(t - t0)  # report time to compute in queue   class FastHandler(RequestHandler):     def get(self):         res = 'fast result ' + self.get_argument('id')         print res         self.write(res)         self.flush()   class MultiThreadedHandler(RequestHandler):     # Note:  This handler can be called with threaded = True or False     def initialize(self, threaded=True):         self._threaded = threaded         self._q = multiprocessing.Queue()      def start_process(self, worker, callback):         # method to start process and watcher thread         self._callback = callback          if self._threaded:             # launch process             multiprocessing.Process(target=worker, args=(self._q,)).start()              # start watching for process to finish             threading.Thread(target=self._watcher).start()          else:             # threaded = False just call directly and block             worker(self._q)             self._watcher()      def _watcher(self):         # watches the queue for process result         while self._q.empty():             time.sleep(0)  # relinquish control if not ready          # put callback back into the ioloop so we can finish request         response = self._q.get(False)         IOLoop.instance().add_callback(lambda: self._callback(response))   class SlowHandler(MultiThreadedHandler):     @asynchronous     def get(self):         # start a thread to watch for         self.start_process(heavy_lifting, self._on_response)      def _on_response(self, delta):         _id = self.get_argument('id')         res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)         print res         self.write(res)         self.flush()         self.finish()   # be sure to finish request   application = Application([     (r"/fast", FastHandler),     (r"/slow", SlowHandler, dict(threaded=False)),     (r"/slow_threaded", SlowHandler, dict(threaded=True)), ])   if __name__ == "__main__":     application.listen(8888)     IOLoop.instance().start() 

call_multi.py (client tester):

import sys from tornado.ioloop import IOLoop from tornado import httpclient   def run(slow):     def show_response(res):         print res.body      # make 3 "slow" requests on server     requests = []     for k in xrange(3):         uri = 'http://localhost:8888/{}?id={}'         requests.append(uri.format(slow, str(k + 1)))      # followed by 20 "fast" requests     for k in xrange(20):         uri = 'http://localhost:8888/fast?id={}'         requests.append(uri.format(k + 1))      # show results as they return     http_client = httpclient.AsyncHTTPClient()      print 'Scheduling Get Requests:'     print '------------------------'     for req in requests:         print req         http_client.fetch(req, show_response)      # execute requests on server     print '\nStart sending requests....'     IOLoop.instance().start()  if __name__ == '__main__':     scenario = sys.argv[1]      if scenario == 'slow' or scenario == 'slow_threaded':         run(scenario) 

Test Results

By running python call_multi.py slow (the blocking behavior):

Scheduling Get Requests: ------------------------ http://localhost:8888/slow?id=1 http://localhost:8888/slow?id=2 http://localhost:8888/slow?id=3 http://localhost:8888/fast?id=1 http://localhost:8888/fast?id=2 http://localhost:8888/fast?id=3 http://localhost:8888/fast?id=4 http://localhost:8888/fast?id=5 http://localhost:8888/fast?id=6 http://localhost:8888/fast?id=7 http://localhost:8888/fast?id=8 http://localhost:8888/fast?id=9 http://localhost:8888/fast?id=10 http://localhost:8888/fast?id=11 http://localhost:8888/fast?id=12 http://localhost:8888/fast?id=13 http://localhost:8888/fast?id=14 http://localhost:8888/fast?id=15 http://localhost:8888/fast?id=16 http://localhost:8888/fast?id=17 http://localhost:8888/fast?id=18 http://localhost:8888/fast?id=19 http://localhost:8888/fast?id=20  Start sending requests.... slow result 1 <--- 1.338 s fast result 1 fast result 2 fast result 3 fast result 4 fast result 5 fast result 6 fast result 7 slow result 2 <--- 1.169 s slow result 3 <--- 1.130 s fast result 8 fast result 9 fast result 10 fast result 11 fast result 13 fast result 12 fast result 14 fast result 15 fast result 16 fast result 18 fast result 17 fast result 19 fast result 20 

By running python call_multi.py slow_threaded (the desired behavior):

Scheduling Get Requests: ------------------------ http://localhost:8888/slow_threaded?id=1 http://localhost:8888/slow_threaded?id=2 http://localhost:8888/slow_threaded?id=3 http://localhost:8888/fast?id=1 http://localhost:8888/fast?id=2 http://localhost:8888/fast?id=3 http://localhost:8888/fast?id=4 http://localhost:8888/fast?id=5 http://localhost:8888/fast?id=6 http://localhost:8888/fast?id=7 http://localhost:8888/fast?id=8 http://localhost:8888/fast?id=9 http://localhost:8888/fast?id=10 http://localhost:8888/fast?id=11 http://localhost:8888/fast?id=12 http://localhost:8888/fast?id=13 http://localhost:8888/fast?id=14 http://localhost:8888/fast?id=15 http://localhost:8888/fast?id=16 http://localhost:8888/fast?id=17 http://localhost:8888/fast?id=18 http://localhost:8888/fast?id=19 http://localhost:8888/fast?id=20  Start sending requests.... fast result 1 fast result 2 fast result 3 fast result 4 fast result 5 fast result 6 fast result 7 fast result 8 fast result 9 fast result 10 fast result 11 fast result 12 fast result 13 fast result 14 fast result 15 fast result 19 fast result 20 fast result 17 fast result 16 fast result 18 slow result 2 <--- 2.485 s slow result 3 <--- 2.491 s slow result 1 <--- 2.517 s 
like image 475
Rocketman Avatar asked Mar 13 '13 01:03

Rocketman


People also ask

Is Python tornado multithreaded?

Modern web servers like Flask, Django, and Tornado are all able to handle multiple requests simultaneously. The concept of multitasking is actually very vague due to its various interpretations. You can perform multitasking using multiprocessing, multithreading, or asyncio.

What is Tornado package in Python?

Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed.

How do you check for tornado in Python?

If using Ubuntu, open a terminal and type python . Then import tornado and the write command tornado. version . You will get output like '4.2.


1 Answers

If you're willing to use concurrent.futures.ProcessPoolExecutor instead of multiprocessing, this is actually very simple. Tornado's ioloop already supports concurrent.futures.Future, so they'll play nicely together out of the box. concurrent.futures is included in Python 3.2+, and has been backported to Python 2.x.

Here's an example:

import time from concurrent.futures import ProcessPoolExecutor from tornado.ioloop import IOLoop from tornado import gen  def f(a, b, c, blah=None):     print "got %s %s %s and %s" % (a, b, c, blah)     time.sleep(5)     return "hey there"  @gen.coroutine def test_it():     pool = ProcessPoolExecutor(max_workers=1)     fut = pool.submit(f, 1, 2, 3, blah="ok")  # This returns a concurrent.futures.Future     print("running it asynchronously")     ret = yield fut     print("it returned %s" % ret)     pool.shutdown()  IOLoop.instance().run_sync(test_it) 

Output:

running it asynchronously got 1 2 3 and ok it returned hey there 

ProcessPoolExecutor has a more limited API than multiprocessing.Pool, but if you don't need the more advanced features of multiprocessing.Pool, it's worth using because the integration is so much simpler.

like image 192
dano Avatar answered Oct 11 '22 14:10

dano