Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous Pool of Connections in Tornado with multiple processes

I'm using both Tornado 4.2.1 and tornadoes 2.4.1 libraries to query my Elasticsearch database and I'm looking for a way to initialize a Pool of connections to shared between several RequestHandler instances in a multiple processes service.

Is it possible to do that? Are there specific libraries for Tornado to do that?

Thanks in advance

like image 329
alauri Avatar asked Mar 13 '23 00:03

alauri


1 Answers

Since tornado-es is just a HTTP client, it uses AsyncHTTPClient in the ESConnection. The new TCP connection is made every request, unless Connection: keep-alive header is specified.

conn = ESConnection()
conn.httprequest_kwargs['headers'] = {'Connection': 'keep-alive'}

I've not tested, but it should work. I used similar setup in ruby (with patron http client), and it works well

Next thing

AsyncHTTPClient has limit of maximum number of simultaneous requests (fetch) per ioloop. Every request that hit the limit is just queued internally.

You may want to increase the global limit:

AsyncHTTPClient.configure(None, max_clients=50)

or separate the client with its own limit (force_instance):

from tornadoes import ESConnection
from tornado.httpclient import AsyncHTTPClient

class CustomESConnection(ESConnection):

    def __init__(self, , host='localhost', port='9200', io_loop=None, protocol='http', max_clients=20):
        super(CustomESConnection, self).__init__(host, port, io_loop, protocol)
        self.client = AsyncHTTPClient(force_instance=True, max_clients=max_clients)

And finally

To reuse the same ESConnection you can create it in the Application, since the application is available with every request (RequestHandler)

from tornado.web import Application, RequestHandler
from tornadoes import ESConnection

class MainHandler(RequestHandler):
    def get(self):
        yield self.application.es.search('something')


class MyApp(Application):

    def __init__(self, *args, **kwargs):
        super(MyApp, self).__init__(*args, **kwargs)

        self.es = ESconnection()

if __name__ == "__main__":
    application = MyApp([
        (r"/", MainHandler),
    ])
    application.listen(8888)
    tornado.ioloop.IOLoop.current().start()

Multiprocess

Actually there is no easy way. The common approach is a pooler, which is used mostly when persistent connection is needed, like databases (pgbouncer for postgres) or as a optimization on high-load service.

And you will have to write a pooler, a gateway application to es

subprocess1 
           \  (http, zmq, ...)
            \            
              > pooler (some queue and tornadoes api) - http -> elastisearch
            /
           /
subprocess2

The subprocesses could communicate with pooler via HTTP, ØMQ (there are many examples even pooler) or some implementation of IPC (sockects, ...).

like image 110
kwarunek Avatar answered Apr 06 '23 21:04

kwarunek