Actual Design:
For those returning to this question, the helpful answer below pushed me towards a workable design that is running fine. Three insights were key:
recv()
to or both try send()
from the same socket simultaneously, then Eventlet elegantly kills the second greenlet with an exception. This is magnificent and means that simple exceptions, and not impossible-to-reproduce data interleaving errors, will result if amqplib
“greens” poorly.amqplib
methods fall roughly into two groups: wait()
loops inside of recv()
until an AMQP message is assembled, while other methods send()
messages back and will not attempt their own recv()
. This is stunningly good luck, given that the amqplib
authors had no idea that someone would try to “green” their library! It means that message sending is not only safe from the callback invoked by wait()
, but that messages can also be sent safely from other greenlets that are completely outside the control of the wait()
loop. These safe methods — that can be called from any greenlet, not just from the wait()
callback — are:
basic_ack
basic_consume
with nowait=True
basic_publish
basic_recover
basic_reject
exchange_declare
with nowait=True
exchange_delete
with nowait=True
queue_bind
with nowait=True
queue_unbind
with nowait=True
queue_declare
with nowait=True
queue_delete
with nowait=True
queue_purge
with nowait=True
1
and then acquire()
and release()
to lock and unlock. All of my async greenlets that want to write messages can use such a lock to avoid having their separate send()
calls interleave and ruin the AMQP protocol.So my code looks roughly like this:
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
class Processor(object):
def __init__(self):
write_lock = eventlet.semaphore.Semaphore(1)
def listening_greenlet(channel):
# start this using eventlet.spawn_n()
# create Connection and self.channel
self.channel.basic_consume(queue, callback=self.consume)
while True:
self.channel.wait()
def safe_publish(channel, *args, **kw):
with write_lock: # yes, Eventlet supports this!
channel.basic_publish(*args, **kw)
def consume(message):
# Returning immediately frees the wait() loop
eventlet.spawn_n(self.process, message)
def process(message):
# do whatever I want
# whenever I am done, I can async reply:
self.safe_publish(...)
Enjoy!
Original Question:
Imagine hundreds of AMQP messages arriving each minute at a small Python Eventlet application, that each need to be processed and answered — where the CPU overhead of the processing will be minimal, but might involve waiting on answers from other services and sockets.
To allow, say, 100 messages to be processed at once, I could of course spin up 100 separate TCP connections to RabbitMQ and have a worker for each connection that receives, processes, and answers single messages in lock-step. But to conserve TCP connections I would prefer to create just one AMQP connection, allow RabbitMQ to stream messages down the pipe at me at full speed, hand those tasks off to workers, and send answers back when each worker completes:
+--------+
+------| worker | <-+
| +--------+ |
| +--------+ |
| +----| worker | <-+
| | +--------+ |
| | +--------+ |
| | +--| worker | <-+
| | | +--------+ |
v v v |
+------------+ |
RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
+------------+
Observe that:
I had been planning to write this using Eventlet and py-amqplib after seeing this attractive blog post about how easily that AMQP library could be pulled into the Eventlet processing model:
http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/
My problem is that, having read the documentation for both libraries, the amqplib source code, and much of the Eventlet source code, I cannot figure out how I can teach the eventlet that owns the AMQP connection — the eventlet named connect_to_host()
in the blog post — to also wake up when a worker completes its work and generates an answer. The wait()
method in amqplib can only be awoken through activity on the AMQP socket. Though it feels like I ought to be able to have the workers write their answers to a queue, and have the connect_to_host()
eventlet wake up either when a new incoming message arrives or when a worker is ready with an answer to send, I cannot find any way for an eventlet to say “wake me when either of these things happens.”
It did occur to me that the workers could try commandeering the AMQP connection object — or even the raw socket — and writing their own messages back over TCP; but it seems as though locks would be necessary to prevent the outgoing worker messages from getting interleaved with each other or with ACK messages written by the main listener eventlet, and I cannot find where locks are available in Eventlet either.
All of this makes me feel almost certain that I am trying to tackle this problem somehow exactly backwards. Does a problem like this — letting a single connection be safely shared between a listener-dispatcher and many workers — simply not map to a coroutine model, and require a full-fledged async library? (In which case: is there one you would recommend for this problem, and how would the multiplexing take place between incoming messages and outgoing worker responses? I found no clean solution earlier today trying combinations like Pika + ioloop — though I have just seen another library, stormed_amqp, that might do better than Pika did.) Or do I actually need to fall back on real live Python threads if I want clean and maintainable code that can enact this model? I am open to all options.
Thanks for any help or ideas! I keep thinking that I have the whole concurrency-in-Python thing pretty much down, then I learn yet again that I do not. :) And I hope you liked the ASCII art above in any case.
After reading your post and working with gevent a similar libary as eventlet a few things became clear to me because I just solved a similar problem
In general there is no need for locking since there ever is only one eventlet or greenlet running at the same time as long none of them are blocking everything seems to run at the same time.. BUT yout dont want to send data down a socket while another greenlet is sending to. you are right and indeed need a locking for that.
If I have questions like these looking in the documentation is not enough.. go look in the source! its opensource anyway you learn a ton more looking at other people code.
here is some simplified example code that might clear things up for you.
in your dispatcher have 2 queue's
self.worker_queue = Queue() # queue for messages to workers
self.server_queue = Queue() # queue for messages to ampq server
have workers put their result on the server queue.
the sending and recieving code
def send_into_ampq():
while True:
message = dispatcher.get_workger_msg()
try:
connection.send(self.encode(message))
except:
connection.kill()
def read_from_ampq():
while True:
message = connection.wait()
dispatcher.put_ampq_msg(self.decode(message))
in your connection code's send function
self._writelock = Semaphore(1)
# this is a gevent locking thing. eventlet must have something like this too..
# just counts - 1 for locks and +1 for releases blocks otherwise blocks until
# 0 agian.... why not google it i though.. and yes im right:
# eventlet.semaphore.Semaphore(value=1)
def send(self, message):
"""
you need a write lock to prevent more greenlets
sending more messages when previous sent is not done yet.
"""
with self._writelock:
self.socket.sendall(message)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With