Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's the best pattern to design an asynchronous RPC application using Python, Pika and AMQP?

The producer module of my application is run by users who want to submit work to be done on a small cluster. It sends the subscriptions in JSON form through the RabbitMQ message broker.

I have tried several strategies, and the best so far is the following, which is still not fully working:

Each cluster machine runs a consumer module, which subscribes itself to the AMQP queue and issues a prefetch_count to tell the broker how many tasks it can run at once.

I was able to make it work using SelectConnection from the Pika AMQP library. Both consumer and producer start two channels, one connected to each queue. The producer sends requests on channel [A] and waits for responses in channel [B], and the consumer waits for requests on channel [A] and send responses on channel [B]. It seems, however, that when the consumer runs the callback that calculates the response, it blocks, so I have only one task executed at each consumer at each time.

What I need in the end:

  1. the consumer [A] subscribes his tasks (around 5k each time) to the cluster
  2. the broker dispatches N messages/requests for each consumer, where N is the number of concurrent tasks it can handle
  3. when a single task is finished, the consumer replies to the broker/producer with the result
  4. the producer receives the replies, update the computation status and, in the end, prints some reports

Restrictions:

  • If another user submits work, all of his tasks will be queued after the previous user (I guess this is automatically true from the queue system, but I haven't thought about the implications on a threaded environment)
  • Tasks have an order to be submitted, but the order they are replied is not important

UPDATE

I have studied a bit further and my actual problem seems to be that I use a simple function as callback to the pika's SelectConnection.channel.basic_consume() function. My last (unimplemented) idea is to pass a threading function, instead of a regular one, so the callback would not block and the consumer can keep listening.

like image 509
guhcampos Avatar asked Sep 13 '11 14:09

guhcampos


People also ask

What is Pika in Python?

Pika is a Python implementation of the AMQP 0-9-1 protocol for RabbitMQ. This tutorial guides you through installing Pika, declaring a queue, setting up a publisher to send messages to the broker's default exchange, and setting up a consumer to recieve messages from the queue. Topics.

What is RabbitMQ RPC?

Callback queue In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response the client needs to send a 'callback' queue address with the request.


1 Answers

As you have noticed, your process blocks when it runs a callback. There are several ways to deal with this depending on what your callback does.

If your callback is IO-bound (doing lots of networking or disk IO) you can use either threads or a greenlet-based solution, such as gevent, eventlet, or greenhouse. Keep in mind, though, that Python is limited by the GIL (Global Interpreter Lock), which means that only one piece of python code is ever running in a single python process. This means that if you are doing lots of computation with python code, these solutions will likely not be much faster than what you already have.

Another option would be to implement your consumer as multiple processes using multiprocessing. I have found multiprocessing to be very useful when doing parallel work. You could implement this by either using a Queue, having the parent process being the consumer and farming out work to its children, or by simply starting up multiple processes which each consume on their own. I would suggest, unless your application is highly concurrent (1000s of workers), to simply start multiple workers, each of which consumes from their own connection. This way, you can use the acknowledgement feature of AMQP, so if a consumer dies while still processing a task, the message is sent back to the queue automatically and will be picked up by another worker, rather than simply losing the request.

A last option, if you control the producer and it is also written in Python, is to use a task library like celery to abstract the task/queue workings for you. I have used celery for several large projects and have found it to be very well written. It will also handle the multiple consumer issues for you with the appropriate configuration.

like image 117
papercrane Avatar answered Oct 06 '22 00:10

papercrane