Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I set up Celery to call a custom initialization function before running my tasks?

Tags:

I have a Django project and I'm trying to use Celery to submit tasks for background processing ( http://ask.github.com/celery/introduction.html ). Celery integrates well with Django and I've been able to submit my custom tasks and get back results.

The only problem is that I can't find a sane way of performing custom initialization in the daemon process. I need to call an expensive function that loads a lot of memory before I start processing the tasks, and I can't afford to call that function every time.

Has anyone had this problem before? Any ideas how to work around it without modifying the Celery source code?

Thanks

like image 939
xelk Avatar asked Jan 25 '10 02:01

xelk


People also ask

How does Celery execute tasks?

Introduction. Celery is a task queue/job queue based on asynchronous message passing. It can be used as a background task processor for your application in which you dump your tasks to execute in the background or at any given moment. It can be configured to execute your tasks synchronously or asynchronously.

Does Celery run tasks in parallel?

Celery is an asynchronous task queue framework written in Python. Celery makes it easy to execute background tasks but also provides tools for parallel execution and task coordination.

How do I schedule a task in Django Celery?

Here's how we can start a worker for our development needs. First, open a new shell or window. In that shell, set up the same Django development environment - activate your virtual environment, or add things to your Python path, whatever you do so that you could use runserver to run your project.


1 Answers

You can either write a custom loader, or use the signals.

Loaders have the on_task_init method, which is called when a task is about to be executed, and on_worker_init which is called by the celery+celerybeat main process.

Using signals is probably the easiest, the signals available are:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    Dispatched when a task is about to be executed by the worker (or locally if using apply/or if CELERY_ALWAYS_EAGER has been set).

  • task_postrun(task_id, task, args, kwargs, retval) Dispatched after a task has been executed in the same conditions as above.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Called when a task is applied (not good for long-running operations)

Additional signals available in 0.9.x (current master branch on github):

  • worker_init()

    Called when celeryd has started (before the task is initialized, so if on a system supporting fork, any memory changes would be copied to the child worker processes).

  • worker_ready()

    Called when celeryd is able to receive tasks.

  • worker_shutdown()

    Called when celeryd is shutting down.

Here's an example precalculating something the first time a task is run in the process:

from celery.task import Task from celery.registry import tasks from celery.signals import task_prerun  _precalc_table = {}  class PowersOfTwo(Task):      def run(self, x):         if x in _precalc_table:             return _precalc_table[x]         else:             return x ** 2 tasks.register(PowersOfTwo)   def _precalc_numbers(**kwargs):     if not _precalc_table: # it's empty, so haven't been generated yet         for i in range(1024):             _precalc_table[i] = i ** 2   # need to use registered instance for sender argument. task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name]) 

If you want the function to be run for all tasks, just skip the sender argument.

like image 179
asksol Avatar answered Oct 21 '22 16:10

asksol