Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to communicate RabbitMQ(Pika library) in tornado application

Pika library support tornado adapter, here is an example about how to publish message using Asynchronous adapter.

I want use pika in tornado application, just an example, I want put tornado request data to RabbitMQ, But don't know how to do it.

Two question don't know how to solve.

1 Pika use tornado adapter has its own ioloop,

self._connection = pika.SelectConnection(pika.URLParameters(self._url),  
                                         self.on_connection_open)  
self._connection.ioloop.start()

Tornado application has its own ioloop,

tornado.ioloop.IOLoop.instance().start()

How to combine those two ioloop?

2 The Pika example publish same message again and again, but I want to publish request data, how to pass request data to publish method?

like image 642
linbo Avatar asked Jul 09 '13 02:07

linbo


1 Answers

On my search for exactly the same thing I found this blog post of Kevin Jing Qiu.

I went the rabbitmq hole a bit further to give every websocket his own set of channel and queues.

The extract from my project can be found below. A tornado application bound to RabbitMQ consists of these parts:

  1. The Tornado Application that will handle web requests. I only see long lived websockets here, but you can do so with short lived http requests as well.
  2. A (one) RabbitMQ connection hold by the PikaClient Instance
  3. a web connection that defines its channels, queues and exchanges when the open method is triggered.

Now a websocket connection can receive data from tornado (data from the browser) via on_message and send it to RabbitMQ.

The websocket connection will receive data from RabbitMQ via basic_consume.

This is not fully functional, but you should get the idea.

class PikaClient(object):

    def __init__(self, io_loop):
        logger.info('PikaClient: __init__')
        self.io_loop = io_loop

        self.connected = False
        self.connecting = False
        self.connection = None
        self.channel = None
        self.message_count = 0
    """ 
    Pika-Tornado connection setup
    The setup process is a series of callback methods.
    connect:connect to rabbitmq and build connection to tornado io loop -> 
    on_connected: create a channel to rabbitmq ->
    on_channel_open: declare queue tornado, bind that queue to exchange 
                     chatserver_out and start consuming messages. 
   """

    def connect(self):
        if self.connecting:
            #logger.info('PikaClient: Already connecting to RabbitMQ')
            return

        #logger.info('PikaClient: Connecting to RabbitMQ')
        self.connecting = True

        cred = pika.PlainCredentials('guest', 'guest')
        param = pika.ConnectionParameters(
            host='localhost',
            port=5672,
            virtual_host='/',
            credentials=cred
        )
        self.connection = TornadoConnection(param,
            on_open_callback=self.on_connected,stop_ioloop_on_close=False)
        self.connection.add_on_close_callback(self.on_closed)

    def on_connected(self, connection):
        logger.info('PikaClient: connected to RabbitMQ')
        self.connected = True
        self.connection = connection
        # now you are able to call the pika api to do things
        # this could be exchange setup for websocket connections to 
        # basic_publish to later.
        self.connection.channel(self.on_channel_open)

    def on_channel_open(self, channel):
        logger.info('PikaClient: Channel %s open, Declaring exchange' % channel)
        self.channel = channel

    def on_closed(self, connection):
        logger.info('PikaClient: rabbit connection closed')
        self.io_loop.stop()


class MyWebSocketHandler(websocket.WebSocketHandler):
    def __init__(self):
        self.status = 'not connected yet'

    def open(self, *args, **kwargs):
        self.status = "ws open"
        self.rabbit_connect() # connect this websocket object to rabbitmq

    def rabbit_connect():
        self.application.pc.connection.channel(self.rabbit_channel_in_ok)

    def rabbit_channel_in_ok(self,channel):
        self.channel_in = channel
        self.channel_in.queue_declare(self.rabbit_declare_ok,
                                      exclusive=True,auto_delete=True)


# and so on...


handlers = [ your_definitions_here_like_websockets_or_such ]
settings = { your_settings_here }
application = tornado.web.Application(handlers,**settings)

def main():
    io_loop = tornado.ioloop.IOLoop.instance()
    # PikaClient is our rabbitmq consumer
    pc = PikaClient(io_loop)
    application.pc = pc
    application.pc.connect()
    application.listen(config.tornadoport)
    try:
        io_loop.start()
    except KeyboardInterrupt:
        io_loop.stop()

if __name__ == '__main__':
    main()
like image 100
itsafire Avatar answered Sep 21 '22 00:09

itsafire