Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Tornado memory leak on dropped connections

I've got a setup where Tornado is used as kind of a pass-through for workers. Request is received by Tornado, which sends this request to N workers, aggregates results and sends it back to client. Which works fine, except when for some reason timeout occurs — then I've got memory leak.

I've got a setup which similar to this pseudocode:

workers = ["http://worker1.example.com:1234/",
           "http://worker2.example.com:1234/", 
           "http://worker3.example.com:1234/" ...]

class MyHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def post(self):
        responses = []

        def __callback(response):
            responses.append(response)
            if len(responses) == len(workers):
                self._finish_req(responses)

        for url in workers:
            async_client = tornado.httpclient.AsyncHTTPClient()
            request = tornado.httpclient.HTTPRequest(url, method=self.request.method, body=body)
            async_client.fetch(request, __callback) 

    def _finish_req(self, responses):
        good_responses = [r for r in responses if not r.error]
        if not good_responses:
            raise tornado.web.HTTPError(500, "\n".join(str(r.error) for r in responses))
        results = aggregate_results(good_responses)
        self.set_header("Content-Type", "application/json")
        self.write(json.dumps(results))
        self.finish()

application = tornado.web.Application([
    (r"/", MyHandler),
])

if __name__ == "__main__":
    ##.. some locking code 
    application.listen()
    tornado.ioloop.IOLoop.instance().start()

What am I doing wrong? Where does the memory leak come from?

like image 537
vartec Avatar asked Sep 27 '12 16:09

vartec


1 Answers

I don't know the source of the problem, and it seems gc should be able to take care of it, but there's two things you can try.

The first method would be to simplify some of the references (it looks like there may still be references to responses when the RequestHandler completes):

class MyHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def post(self):
        self.responses = []

        for url in workers:
            async_client = tornado.httpclient.AsyncHTTPClient()
            request = tornado.httpclient.HTTPRequest(url, method=self.request.method, body=body)
            async_client.fetch(request, self._handle_worker_response) 

    def _handle_worker_response(self, response):
        self.responses.append(response)
        if len(self.responses) == len(workers):
            self._finish_req()

    def _finish_req(self):
        ....

If that doesn't work, you can always invoke garbage collection manually:

import gc
class MyHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def post(self):
        ....

    def _finish_req(self):
        ....

    def on_connection_close(self):
        gc.collect()
like image 56
Cole Maclean Avatar answered Sep 21 '22 07:09

Cole Maclean