Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rabbitmq - consuming and publishing on same channel?

I have two python processes connected to Rabbitmq via pika. Each consumes a set of topics which the other publishes as a response. One uses the SelectConnection and the other uses the TornadoConnection.

Both are currently just test programs which simulate a conversation between a user and my server, and each program's on_message() is simply hard-coded to branch on the routing_key received and publish the appropriate response to it's counterpart.

Originally, after a random amount of time, typically no longer than 2 minutes, I'd get an error like:

UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead

After searching through numerous posts here on stack overflow and elsewhere, I've come to understand that this error has to do with a race condition where something is being consumed before the basic_publish was complete.

I've made a change to my code so that rather than doing an immediate basic_publish(), I pass a callback to connection.add_timeout() with a delay of 1 second. After making this change, I've been able to have numerous runs where two processes have a "conversation" with each other for > 1hr without reproducing the error.

My question is, is this just a hack which works only because I'm simulating one user? Do I need to have 2 separate channels for consuming and publishing?

def on_message(self, unused_channel, basic_deliver, properties, body):
    if self._sibling_app_id == properties.app_id:
        self.dispatch_message(basic_deliver, properties, body)


def dispatch_message(self, basic_deliver, properties, body):
    (user_id, msg_type) = basic_deliver.routing_key.rsplit('.', 1)

    if "login-response" == msg_type:
        print body
    elif "gid-assignment" == msg_type:
        print body
    elif "tutor-logout" == msg_type:
        print body
    elif "tutor-turn" == msg_type:
        message = "i don't know"
        routing_key = "%s.input" % user_id
        callback = self.delayed_publish_message(routing_key, message)
        self.schedule_next_message(callback, 1)
    elif "nlu" == msg_type:
        message = "dnk"
        routing_key = "%s.nlu-response" % user_id
        callback = self.delayed_publish_message(routing_key, message)
        self.schedule_next_message(callback, 1)
    else:
        print "invalid message-type: %s" % msg_type
        print body

def delayed_publish_message(self, routing_key, message):
    """returns a callback which can be passed to schedule_next_message()"""
    def delayed_publish_cb():
        self.publish_message(routing_key, message)
    return delayed_publish_cb


def schedule_next_message(self, cb, publish_interval=None):
    if self._stopping:
        return
    if publish_interval is None:
        publish_interval = self.PUBLISH_INTERVAL
    if -1 == publish_interval:
        return
    self._connection.add_timeout(publish_interval, cb)


def publish_message(self, routing_key, message):
    if self._stopping:
        return
    properties = pika.BasicProperties(app_id=self._app_id,
                                                          content_type='text/plain')
    self._channel.basic_publish(self.EXCHANGE, routing_key,
                                                 message, properties)
like image 843
ckot Avatar asked Jul 31 '14 22:07

ckot


People also ask

What is the difference between connection and channel in RabbitMQ?

A connection is a TCP connection between your application and the RabbitMQ broker. A channel is a virtual connection inside a connection. In other words, a channel multiplexes a TCP connection. Typically, each process only creates one TCP connection, and uses multiple channels in that connection for different threads.

How many consumers can RabbitMQ handle?

Single active consumer allows to have only one consumer at a time consuming from a queue and to fail over to another registered consumer in case the active one is cancelled or dies. Consuming with only one consumer is useful when messages must be consumed and processed in the same order they arrive in the queue.

How many channels can RabbitMQ support?

You can use one Channel for everything. However, if you have multiple threads, it's suggested to use a different Channel for each thread. Channel thread-safety in Java Client API Guide: Channel instances are safe for use by multiple threads.

How do I stop drinking RabbitMQ?

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user... @googlegroups.com.


2 Answers

A channel is to be used uni-directional. The AMQP protocol specification is very clear about that:

An AMQP Session correlates two unidirectional Channels to form a bidirectional, sequential conversation between two Containers. A single Connection may have multiple independent Sessions active simultaneously, up to the negotiated Channel limit. Both Connections and Sessions are modeled by each peer as endpoints that store local and last known remote state regarding the Connection or Session in question.

Thus you are supposed to use an input and an output channel for your application.

like image 114
itsafire Avatar answered Oct 02 '22 06:10

itsafire


I did my commits, was about to go to bed and suddenly I figured it out. I turns out that the python tutorials on rabbitmq.com still say to install pika with:

 sudo pip install pika==0.9.8

And while 0.9.8 came out sometime in 2012, I think the fix was added sometime after that release. with 0.9.9 being released sometime in 2013

So, I did:

sudo pip uninstall pika

followed by the installation instructions on the pika site:

sudo pip install pika

then I replaced all of my connection.add_timeout(1, delayed_publish_cb) with basic_publish(), crossed my fingers, ran it and my two processes exchanged about 200,000 messages with each other in less than 5 minutes without any problems

Good to know that the bug fix from back in 2012 still works.

I'll have to let the rabbitmq folks know to update their tutorial.

like image 23
ckot Avatar answered Oct 02 '22 05:10

ckot