I have two python processes connected to Rabbitmq via pika. Each consumes a set of topics which the other publishes as a response. One uses the SelectConnection and the other uses the TornadoConnection.
Both are currently just test programs which simulate a conversation between a user and my server, and each program's on_message() is simply hard-coded to branch on the routing_key received and publish the appropriate response to it's counterpart.
Originally, after a random amount of time, typically no longer than 2 minutes, I'd get an error like:
UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead
After searching through numerous posts here on stack overflow and elsewhere, I've come to understand that this error has to do with a race condition where something is being consumed before the basic_publish was complete.
I've made a change to my code so that rather than doing an immediate basic_publish(), I pass a callback to connection.add_timeout() with a delay of 1 second. After making this change, I've been able to have numerous runs where two processes have a "conversation" with each other for > 1hr without reproducing the error.
My question is, is this just a hack which works only because I'm simulating one user? Do I need to have 2 separate channels for consuming and publishing?
def on_message(self, unused_channel, basic_deliver, properties, body):
if self._sibling_app_id == properties.app_id:
self.dispatch_message(basic_deliver, properties, body)
def dispatch_message(self, basic_deliver, properties, body):
(user_id, msg_type) = basic_deliver.routing_key.rsplit('.', 1)
if "login-response" == msg_type:
print body
elif "gid-assignment" == msg_type:
print body
elif "tutor-logout" == msg_type:
print body
elif "tutor-turn" == msg_type:
message = "i don't know"
routing_key = "%s.input" % user_id
callback = self.delayed_publish_message(routing_key, message)
self.schedule_next_message(callback, 1)
elif "nlu" == msg_type:
message = "dnk"
routing_key = "%s.nlu-response" % user_id
callback = self.delayed_publish_message(routing_key, message)
self.schedule_next_message(callback, 1)
else:
print "invalid message-type: %s" % msg_type
print body
def delayed_publish_message(self, routing_key, message):
"""returns a callback which can be passed to schedule_next_message()"""
def delayed_publish_cb():
self.publish_message(routing_key, message)
return delayed_publish_cb
def schedule_next_message(self, cb, publish_interval=None):
if self._stopping:
return
if publish_interval is None:
publish_interval = self.PUBLISH_INTERVAL
if -1 == publish_interval:
return
self._connection.add_timeout(publish_interval, cb)
def publish_message(self, routing_key, message):
if self._stopping:
return
properties = pika.BasicProperties(app_id=self._app_id,
content_type='text/plain')
self._channel.basic_publish(self.EXCHANGE, routing_key,
message, properties)
A connection is a TCP connection between your application and the RabbitMQ broker. A channel is a virtual connection inside a connection. In other words, a channel multiplexes a TCP connection. Typically, each process only creates one TCP connection, and uses multiple channels in that connection for different threads.
Single active consumer allows to have only one consumer at a time consuming from a queue and to fail over to another registered consumer in case the active one is cancelled or dies. Consuming with only one consumer is useful when messages must be consumed and processed in the same order they arrive in the queue.
You can use one Channel for everything. However, if you have multiple threads, it's suggested to use a different Channel for each thread. Channel thread-safety in Java Client API Guide: Channel instances are safe for use by multiple threads.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user... @googlegroups.com.
A channel is to be used uni-directional. The AMQP protocol specification is very clear about that:
An AMQP Session correlates two unidirectional Channels to form a bidirectional, sequential conversation between two Containers. A single Connection may have multiple independent Sessions active simultaneously, up to the negotiated Channel limit. Both Connections and Sessions are modeled by each peer as endpoints that store local and last known remote state regarding the Connection or Session in question.
Thus you are supposed to use an input and an output channel for your application.
I did my commits, was about to go to bed and suddenly I figured it out. I turns out that the python tutorials on rabbitmq.com still say to install pika with:
sudo pip install pika==0.9.8
And while 0.9.8 came out sometime in 2012, I think the fix was added sometime after that release. with 0.9.9 being released sometime in 2013
So, I did:
sudo pip uninstall pika
followed by the installation instructions on the pika site:
sudo pip install pika
then I replaced all of my connection.add_timeout(1, delayed_publish_cb) with basic_publish(), crossed my fingers, ran it and my two processes exchanged about 200,000 messages with each other in less than 5 minutes without any problems
Good to know that the bug fix from back in 2012 still works.
I'll have to let the rabbitmq folks know to update their tutorial.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With