Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the proper way to handle Redis connection in Tornado ? (Async - Pub/Sub)

I am using Redis along with my Tornado application with asyc client Brukva, when I looked at the sample apps at Brukva site they are making new connection on "init" method in websocket

class MessagesCatcher(tornado.websocket.WebSocketHandler):
    def __init__(self, *args, **kwargs):
        super(MessagesCatcher, self).__init__(*args, **kwargs)
        self.client = brukva.Client()
        self.client.connect()
        self.client.subscribe('test_channel')

    def open(self):
        self.client.listen(self.on_message)

    def on_message(self, result):
        self.write_message(str(result.body))

    def close(self):
        self.client.unsubscribe('test_channel')
        self.client.disconnect()

its fine in the case of websocket but how to handle it in the common Tornado RequestHandler post method say long polling operation (publish-subscribe model). I am making new client connetion in every post method of update handler is this the right approach ?? When I checked at the redis console I see that clients increasing in every new post operation.

enter image description here

Here is a sample of my code.

c = brukva.Client(host = '127.0.0.1')
c.connect()

class MessageNewHandler(BaseHandler):
    @tornado.web.authenticated
    def post(self):

        self.listing_id = self.get_argument("listing_id")
        message = {
            "id": str(uuid.uuid4()),
            "from": str(self.get_secure_cookie("username")),
            "body": str(self.get_argument("body")),
        }
        message["html"] = self.render_string("message.html", message=message)

        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            c.publish(self.listing_id, message)
            logging.info("Writing message : " + json.dumps(message))
            self.write(json.dumps(message))

    class MessageUpdatesHandler(BaseHandler):
        @tornado.web.authenticated
        @tornado.web.asynchronous
        def post(self):
            self.listing_id = self.get_argument("listing_id", None)
            self.client = brukva.Client()
            self.client.connect()
            self.client.subscribe(self.listing_id)
            self.client.listen(self.on_new_messages)

        def on_new_messages(self, messages):
            # Closed client connection
            if self.request.connection.stream.closed():
                return
            logging.info("Getting update : " + json.dumps(messages.body))
            self.finish(json.dumps(messages.body))
            self.client.unsubscribe(self.listing_id)


        def on_connection_close(self):
            # unsubscribe user from channel
            self.client.unsubscribe(self.listing_id)
            self.client.disconnect()

I appreciate if you provide some sample code for similar case.

like image 813
Burak Dede Avatar asked Dec 11 '11 19:12

Burak Dede


2 Answers

A little late but, I've been using tornado-redis. It works with tornado's ioloop and the tornado.gen module

Install tornadoredis

It can be installed from pip

pip install tornadoredis

or with setuptools

easy_install tornadoredis

but you really shouldn't do that. You could also clone the repository and extract it. Then run

python setup.py build
python setup.py install

Connect to redis

The following code goes in your main.py or equivalent

redis_conn = tornadoredis.Client('hostname', 'port')
redis_conn.connect()

redis.connect is called only once. It is a blocking call, so it should be called before starting the main ioloop. The same connection object is shared between all the handlers.

You could add it to your application settings like

settings = {
    redis = redis_conn
}
app = tornado.web.Application([('/.*', Handler),],
                              **settings)

Use tornadoredis

The connection can be used in handlers as self.settings['redis'] or it can be added as a property of the BaseHandler class. Your request handlers subclass that class and access the property.

class BaseHandler(tornado.web.RequestHandler):

    @property
    def redis():
        return self.settings['redis']

To communicate with redis, the tornado.web.asynchronous and the tornado.gen.engine decorators are used

class SomeHandler(BaseHandler):

    @tornado.web.asynchronous
    @tornado.gen.engine
    def get(self):
        foo = yield gen.Task(self.redis.get, 'foo')
        self.render('sometemplate.html', {'foo': foo}

Extra information

More examples and other features like connection pooling and pipelines can be found at the github repo.

like image 61
Ananth Avatar answered Oct 19 '22 17:10

Ananth


you should pool the connections in your app. since it seems like brukva doesn't support this automatically (redis-py supports this, but is blocking by nature so it doesn't go well with tornado), you need to write your own connection pool.

the pattern is pretty simple, though. something along these lines (this is not real operational code):

class BrukvaPool():

    __conns = {}


    def get(host, port,db):
        ''' Get a client for host, port, db '''

        key = "%s:%s:%s" % (host, port, db)

        conns = self.__conns.get(key, [])
        if conns:
            ret = conns.pop()
            return ret
        else:
           ## Init brukva client here and connect it

    def release(client):
        ''' release a client at the end of a request '''
        key = "%s:%s:%s" % (client.connection.host, client.connection.port, client.connection.db)
        self.__conns.setdefault(key, []).append(client)

it can be a bit more tricky, but that's the main idea.

like image 39
Not_a_Golfer Avatar answered Oct 19 '22 16:10

Not_a_Golfer