Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous RabbitMQ consumer with aioamqp

I'm trying to write an asynchronous consumer using asyncio/aioamqp. My problem is, the callback coroutine (below) is blocking. I set the channel to do a basic_consume(), and assign the callback as callback(). The callback has a "yield from asyncio.sleep" statement (to simulate "work"), which takes an integer from the publisher and sleeps for that amount of time before printing the message.

If I published two messages, one with a time of "10", immediately followed by one with a time of "1", I expected the second message would print first, since it has a shorter sleep time. Instead, the callback blocks for 10 seconds, prints the first message, and then prints the second.

It appears either basic_consume, or the callback, is blocking somewhere. Is there another way this could be handled?

@asyncio.coroutine
def callback(body, envelope, properties):
    yield from asyncio.sleep(int(body))
    print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag))

@asyncio.coroutine
def receive_log():
    try:
        transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password")
    except:
        print("closed connections")
        return

    channel = yield from protocol.channel()
    exchange_name = 'cloudstack-events'
    exchange_name = 'test-async-exchange'
    queue_name = 'async-queue-%s' % random.randint(0, 10000)
    yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False)
    yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10)

    binding_keys = ['mykey']

    for binding_key in binding_keys:
        print("binding", binding_key)
        yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name,
                                                       queue_name=queue_name,
                                                       routing_key=binding_key), timeout=10)

    print(' [*] Waiting for logs. To exit press CTRL+C')
    yield from channel.basic_consume(queue_name, callback=callback)

loop = asyncio.get_event_loop()
loop.create_task(receive_log())
loop.run_forever()
like image 745
blindsnowmobile Avatar asked Jul 09 '15 17:07

blindsnowmobile


1 Answers

For those interested, I figured out a way to do this. I'm not sure if it's best practice, but it's accomplishing what I need.

Rather than do the "work" (in this case, async.sleep) inside the callback, I create a new task on the loop, and schedule a separate co-routine to run do_work(). Presumably this is working, because it's freeing up callback() to return immediately.

I loaded up a few hundred events in Rabbit with different sleep timers, and they were interleaved when printed by the code below. So it seems to be working. Hope this helps someone!

@asyncio.coroutine
def do_work(envelope, body):
    yield from asyncio.sleep(int(body))
    print("consumer {} recved {} ({})".format(envelope.consumer_tag, body, envelope.delivery_tag))

@asyncio.coroutine
def callback(body, envelope, properties):
    loop = asyncio.get_event_loop()
    loop.create_task(do_work(envelope, body))

@asyncio.coroutine
def receive_log():
    try:
        transport, protocol = yield from aioamqp.connect('localhost', 5672, login="login", password="password")
    except:
        print("closed connections")
        return

    channel = yield from protocol.channel()
    exchange_name = 'cloudstack-events'
    exchange_name = 'test-async-exchange'
    queue_name = 'async-queue-%s' % random.randint(0, 10000)
    yield from channel.exchange(exchange_name, 'topic', auto_delete=True, passive=False, durable=False)
    yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=True), timeout=10)

    binding_keys = ['mykey']

    for binding_key in binding_keys:
        print("binding", binding_key)
        yield from asyncio.wait_for(channel.queue_bind(exchange_name=exchange_name,
                                                       queue_name=queue_name,
                                                       routing_key=binding_key), timeout=10)

    print(' [*] Waiting for logs. To exit press CTRL+C')
    yield from channel.basic_consume(queue_name, callback=callback)

loop = asyncio.get_event_loop()
loop.create_task(receive_log())
loop.run_forever()
like image 160
blindsnowmobile Avatar answered Oct 31 '22 04:10

blindsnowmobile