Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to do a simple Pika SelectConnection to send a message, in python?

I am trying to convert my code to send rabbitmq messages via Pika instead. I am having a lot of trouble understanding how to send a simple message using an asynchronous connection (such as SelectConnection).

In my old code, which I use the amqp library I simply declare a class like this:

import amqp as amqp

class MQ():

    mqConn = None
    channel = None

    def __init__(self):
        self.connect()

    def connect(self):
        if self.mqConn is None:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

        elif not self.mqConn.connected:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

    def sendMQ(self, message):
        self.connect()
        lMessage = amqp.Message(message)
        self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q") 

And then elsewhere in my code I call sendMQ("this is my message"), and then the code continues. I do not need to listen for acknowledgements etc.

Could someone please write a simple class utilizing pika and SelectConnection that would also work to just send a message using sendMQ("this is my message")? I've looked at the pika examples but I don't know how to get around the ioloop and KeyboardInterrupt. I guess I'm just not sure how to make my code continue to run without all these try/excepts... Also, not exactly sure how I can pass my message on through all the callbacks...

Any help is appreciated!

Thanks.

like image 998
TheBear Avatar asked May 19 '15 17:05

TheBear


1 Answers

The whole thing is call back driven, as it is a async way of doing things. Async consumer is easy to understand, we can get the message by providing a call back function. However the publisher part is a bit difficult to understand, at least, for beginner.

Usually we need a Queue to do the communication, and the publisher get data from it periodically.

The key thing of using SelectConnection is to register your publish message function into the event loop, which can be done by connection.add_timeout. After you are done with the publish, register next round of your publish.

The next question is where to put the the initial registration. The initial registration can be done in the channel open call back.

Below is a code-snip for better understanding. Be aware, it is not production ready. Because it only publish message at max speed of 10 per second. You need to adjust the publish interval and publish more message at one call back.

class MQ(Object):
    def __init___(self, queue):
        self.queue = queue
    def on_channel_open(self, chn):
        self.channel = chn
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def schedule_next_message(self):
        try:
            msg = self.queue.get(True, 0.01)
            self.channel.basic_publish('YOUR EXCHANGE','YOUR ROUTING KEY',msg)
        except Queue.Empty:
            pass
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def on_open(self, conn):
        self.connection = conn
        self.connection.channel(on_open_callback=self.on_channel_open)
    def run(self):
        # create a connection
        self.connection = pika.SelectConnection(pika.ConnectionParameters(heartbeat=600,host=args.mq_ip),self.on_open)
        try:
            self.connection.ioloop.start()
        except Exception:
            print("exception in publisher")
            self.connection.close()
            self.connection.ioloop.start()

Put MQ(queue).run() in a separate thread, and whenever you want to put message to mq, just put it into the queue object.

like image 118
TerrenceSun Avatar answered Nov 14 '22 23:11

TerrenceSun