Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can eventlet manage an AMQP connection with messages passing asynchronously both in and out?

Actual Design:

For those returning to this question, the helpful answer below pushed me towards a workable design that is running fine. Three insights were key:

  1. Eventlet is a very safe environment — if two greenlets both try recv() to or both try send() from the same socket simultaneously, then Eventlet elegantly kills the second greenlet with an exception. This is magnificent and means that simple exceptions, and not impossible-to-reproduce data interleaving errors, will result if amqplib “greens” poorly.
  2. The amqplib methods fall roughly into two groups: wait() loops inside of recv() until an AMQP message is assembled, while other methods send() messages back and will not attempt their own recv(). This is stunningly good luck, given that the amqplib authors had no idea that someone would try to “green” their library! It means that message sending is not only safe from the callback invoked by wait(), but that messages can also be sent safely from other greenlets that are completely outside the control of the wait() loop. These safe methods — that can be called from any greenlet, not just from the wait() callback — are:
    1. basic_ack
    2. basic_consume with nowait=True
    3. basic_publish
    4. basic_recover
    5. basic_reject
    6. exchange_declare with nowait=True
    7. exchange_delete with nowait=True
    8. queue_bind with nowait=True
    9. queue_unbind with nowait=True
    10. queue_declare with nowait=True
    11. queue_delete with nowait=True
    12. queue_purge with nowait=True
  3. Semaphores can be used as locks: initialize the semaphore with the count 1 and then acquire() and release() to lock and unlock. All of my async greenlets that want to write messages can use such a lock to avoid having their separate send() calls interleave and ruin the AMQP protocol.

So my code looks roughly like this:

amqp = eventlet.patcher.import_patched('amqplib.client_0_8')

class Processor(object):
    def __init__(self):
        write_lock = eventlet.semaphore.Semaphore(1)

    def listening_greenlet(channel):
        # start this using eventlet.spawn_n()
        # create Connection and self.channel
        self.channel.basic_consume(queue, callback=self.consume)
        while True:
            self.channel.wait()

    def safe_publish(channel, *args, **kw):
        with write_lock:  # yes, Eventlet supports this!
            channel.basic_publish(*args, **kw)     

    def consume(message):
        # Returning immediately frees the wait() loop
        eventlet.spawn_n(self.process, message)

    def process(message):
        # do whatever I want
        # whenever I am done, I can async reply:
        self.safe_publish(...)

Enjoy!

Original Question:

Imagine hundreds of AMQP messages arriving each minute at a small Python Eventlet application, that each need to be processed and answered — where the CPU overhead of the processing will be minimal, but might involve waiting on answers from other services and sockets.

To allow, say, 100 messages to be processed at once, I could of course spin up 100 separate TCP connections to RabbitMQ and have a worker for each connection that receives, processes, and answers single messages in lock-step. But to conserve TCP connections I would prefer to create just one AMQP connection, allow RabbitMQ to stream messages down the pipe at me at full speed, hand those tasks off to workers, and send answers back when each worker completes:

                                       +--------+
                                +------| worker | <-+
                                |      +--------+   |
                                |      +--------+   |
                                | +----| worker | <-+
                                | |    +--------+   |
                                | |    +--------+   |
                                | | +--| worker | <-+
                                | | |  +--------+   |
                                v v v               |
                           +------------+           |
 RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
                           +------------+

Observe that:

  • An Eventlet queue could elegantly distribute incoming work among the workers as they become available for more work.
  • Flow control from RabbitMQ might even be possible: I can ACK messages only until my workers are all busy, and then wait before sending further ACKs until the queue starts to empty.
  • Work will be almost certainly completed out-of-order: one request might finish quickly while another event that arrived earlier takes much longer; and some requests might never complete at all; so the workers will be handing back responses in an unpredictable and asynchronous order.

I had been planning to write this using Eventlet and py-amqplib after seeing this attractive blog post about how easily that AMQP library could be pulled into the Eventlet processing model:

http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/

My problem is that, having read the documentation for both libraries, the amqplib source code, and much of the Eventlet source code, I cannot figure out how I can teach the eventlet that owns the AMQP connection — the eventlet named connect_to_host() in the blog post — to also wake up when a worker completes its work and generates an answer. The wait() method in amqplib can only be awoken through activity on the AMQP socket. Though it feels like I ought to be able to have the workers write their answers to a queue, and have the connect_to_host() eventlet wake up either when a new incoming message arrives or when a worker is ready with an answer to send, I cannot find any way for an eventlet to say “wake me when either of these things happens.”

It did occur to me that the workers could try commandeering the AMQP connection object — or even the raw socket — and writing their own messages back over TCP; but it seems as though locks would be necessary to prevent the outgoing worker messages from getting interleaved with each other or with ACK messages written by the main listener eventlet, and I cannot find where locks are available in Eventlet either.

All of this makes me feel almost certain that I am trying to tackle this problem somehow exactly backwards. Does a problem like this — letting a single connection be safely shared between a listener-dispatcher and many workers — simply not map to a coroutine model, and require a full-fledged async library? (In which case: is there one you would recommend for this problem, and how would the multiplexing take place between incoming messages and outgoing worker responses? I found no clean solution earlier today trying combinations like Pika + ioloop — though I have just seen another library, stormed_amqp, that might do better than Pika did.) Or do I actually need to fall back on real live Python threads if I want clean and maintainable code that can enact this model? I am open to all options.

Thanks for any help or ideas! I keep thinking that I have the whole concurrency-in-Python thing pretty much down, then I learn yet again that I do not. :) And I hope you liked the ASCII art above in any case.

like image 664
Brandon Rhodes Avatar asked Nov 02 '11 00:11

Brandon Rhodes


1 Answers

After reading your post and working with gevent a similar libary as eventlet a few things became clear to me because I just solved a similar problem

In general there is no need for locking since there ever is only one eventlet or greenlet running at the same time as long none of them are blocking everything seems to run at the same time.. BUT yout dont want to send data down a socket while another greenlet is sending to. you are right and indeed need a locking for that.

If I have questions like these looking in the documentation is not enough.. go look in the source! its opensource anyway you learn a ton more looking at other people code.

here is some simplified example code that might clear things up for you.

in your dispatcher have 2 queue's

self.worker_queue = Queue() # queue for messages to workers
self.server_queue = Queue() # queue for messages to ampq server

have workers put their result on the server queue.

the sending and recieving code

def send_into_ampq():
    while True:
       message = dispatcher.get_workger_msg()

       try:
          connection.send(self.encode(message))
       except:
           connection.kill()

def read_from_ampq():
    while True:
        message = connection.wait()

        dispatcher.put_ampq_msg(self.decode(message))

in your connection code's send function

self._writelock = Semaphore(1) 
# this is a gevent locking thing. eventlet must have something like this too..
# just counts - 1 for locks and +1 for releases blocks otherwise blocks until 
# 0 agian.... why not google it i though.. and yes im right:
# eventlet.semaphore.Semaphore(value=1)

def send(self, message):
    """
    you need a write lock to prevent more greenlets
    sending more messages when previous sent is not done yet.
    """

    with self._writelock:
        self.socket.sendall(message)
like image 174
Stephan Avatar answered Sep 25 '22 20:09

Stephan