Asynchronous RabbitMQ consumer with aioamqp

I'm trying to write an asynchronous consumer using asyncio/aioamqp. My problem is, the callback coroutine (below) is blocking. I set the channel to do a basic_consume(), and assign the callback as callback(). The callback has a "yield from asyncio.sleep" statement (to simulate "work"), which takes an integer from the publisher and sleeps for that amount of time before printing the message.

If I published two messages, one with a time of "10", immediately followed by one with a time of "1", I expected the second message would print first, since it has a shorter sleep time. Instead, the callback blocks for 10 seconds, prints the first message, and then prints the second.

It appears either basic_consume, or the callback, is blocking somewhere. Is there another way this could be handled?

def callback(body, envelope, properties):
    yield from asyncio.sleep(int(body))
    print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag))

def receive_log():
        transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password")
        print("closed connections")

    channel = yield from protocol.channel()
    exchange_name = 'cloudstack-events'
    exchange_name = 'test-async-exchange'
    queue_name = 'async-queue-%s' % random.randint(0, 10000)
    yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False)
    yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10)

    binding_keys = ['mykey']

    for binding_key in binding_keys:
        print("binding", binding_key)
        yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name,
                                                       routing_key=binding_key), timeout=10)

    print(' [*] Waiting for logs. To exit press CTRL+C')
    yield from channel.basic_consume(queue_name, callback=callback)

loop = asyncio.get_event_loop()
1 Answers

For those interested, I figured out a way to do this. I'm not sure if it's best practice, but it's accomplishing what I need.

Rather than do the "work" (in this case, async.sleep) inside the callback, I create a new task on the loop, and schedule a separate co-routine to run do_work(). Presumably this is working, because it's freeing up callback() to return immediately.

I loaded up a few hundred events in Rabbit with different sleep timers, and they were interleaved when printed by the code below. So it seems to be working. Hope this helps someone!

def do_work(envelope, body):
    yield from asyncio.sleep(int(body))
    print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag))

def callback(body, envelope, properties):
    loop = asyncio.get_event_loop()
    loop.create_task(do_work(envelope, body))

def receive_log():
        transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password")
        print("closed connections")

    channel = yield from protocol.channel()
    exchange_name = 'cloudstack-events'
    exchange_name = 'test-async-exchange'
    queue_name = 'async-queue-%s' % random.randint(0, 10000)
    yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False)
    yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10)

    binding_keys = ['mykey']

    for binding_key in binding_keys:
        print("binding", binding_key)
        yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name,
                                                       routing_key=binding_key), timeout=10)

    print(' [*] Waiting for logs. To exit press CTRL+C')
    yield from channel.basic_consume(queue_name, callback=callback)

loop = asyncio.get_event_loop()
