I'm using both Tornado 4.2.1 and tornadoes 2.4.1 libraries to query my Elasticsearch database and I'm looking for a way to initialize a Pool of connections to shared between several RequestHandler instances in a multiple processes service.
Is it possible to do that? Are there specific libraries for Tornado to do that?
Thanks in advance
Since tornado-es
is just a HTTP client, it uses AsyncHTTPClient
in the ESConnection
. The new TCP connection is made every request, unless Connection: keep-alive
header is specified.
conn = ESConnection()
conn.httprequest_kwargs['headers'] = {'Connection': 'keep-alive'}
I've not tested, but it should work. I used similar setup in ruby (with patron http client), and it works well
Next thing
AsyncHTTPClient
has limit of maximum number of simultaneous requests (fetch
) per ioloop. Every request that hit the limit is just queued internally.
You may want to increase the global limit:
AsyncHTTPClient.configure(None, max_clients=50)
or separate the client with its own limit (force_instance
):
from tornadoes import ESConnection
from tornado.httpclient import AsyncHTTPClient
class CustomESConnection(ESConnection):
def __init__(self, , host='localhost', port='9200', io_loop=None, protocol='http', max_clients=20):
super(CustomESConnection, self).__init__(host, port, io_loop, protocol)
self.client = AsyncHTTPClient(force_instance=True, max_clients=max_clients)
And finally
To reuse the same ESConnection you can create it in the Application, since the application is available with every request (RequestHandler)
from tornado.web import Application, RequestHandler
from tornadoes import ESConnection
class MainHandler(RequestHandler):
def get(self):
yield self.application.es.search('something')
class MyApp(Application):
def __init__(self, *args, **kwargs):
super(MyApp, self).__init__(*args, **kwargs)
self.es = ESconnection()
if __name__ == "__main__":
application = MyApp([
(r"/", MainHandler),
])
application.listen(8888)
tornado.ioloop.IOLoop.current().start()
Multiprocess
Actually there is no easy way. The common approach is a pooler, which is used mostly when persistent connection is needed, like databases (pgbouncer for postgres) or as a optimization on high-load service.
And you will have to write a pooler, a gateway application to es
subprocess1
\ (http, zmq, ...)
\
> pooler (some queue and tornadoes api) - http -> elastisearch
/
/
subprocess2
The subprocesses could communicate with pooler via HTTP, ØMQ (there are many examples even pooler) or some implementation of IPC (sockects, ...).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With