Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can a celery worker/server accept tasks from a non celery producer?

I want to use a comet server written using java nio for sending out live updates. When receiving information I want it to scan the data, and send tasks to worker threads via rabbitmq. Ideally I would like a celery server to sit on the other end of rabbit, managing a pool of worker threads that will handle these tasks.

However, from my understanding, celery works by sitting on both ends of rabbitmq, and it essentially takes over the role of producer and consumer by being embedded in both the consumer and producer's code. Is there a way to set up celery as I described above? Thanks

like image 218
Jaigus Avatar asked Aug 15 '12 06:08

Jaigus


People also ask

How does Celery execute tasks?

Process of Task Execution by Celery can be broken down into:Your application sends the tasks to the task broker, it is then reserved by a worker for execution & finally the result of task execution is stored in the result backend.

How does a Celery worker work?

Dedicated worker processes constantly monitor task queues for new work to perform. Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.

How many tasks can Celery handle?

celery beats only trigger those 1000 tasks (by the crontab schedule), not run them. If you want to run 1000 tasks in parallel, you should have enough celery workers available to run those tasks.


1 Answers

Yes, of cource !

You can add Custom Message Consumers to a celery app.

Please refer to Extensions and Bootsteps in celery documents.

Here is a part of example code in the link above:

from celery import Celery
from celery import bootsteps
from kombu import Consumer, Exchange, Queue

my_queue = Queue('custom', Exchange('custom'), 'routing_key')

app = Celery(broker='amqp://')


class MyConsumerStep(bootsteps.ConsumerStep):

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=[my_queue],
                         callbacks=[self.handle_message],
                         accept=['json'])]

    def handle_message(self, body, message):
        print('Received message: {0!r}'.format(body))
        message.ack()
app.steps['consumer'].add(MyConsumerStep)

Test it:

python -m celery -A main worker

See also: Using Celery with existing RabbitMQ messages

like image 124
Comzyh Avatar answered Jan 04 '23 03:01

Comzyh