I have a Django project and I'm trying to use Celery to submit tasks for background processing ( http://ask.github.com/celery/introduction.html ). Celery integrates well with Django and I've been able to submit my custom tasks and get back results.
The only problem is that I can't find a sane way of performing custom initialization in the daemon process. I need to call an expensive function that loads a lot of memory before I start processing the tasks, and I can't afford to call that function every time.
Has anyone had this problem before? Any ideas how to work around it without modifying the Celery source code?
Thanks
Introduction. Celery is a task queue/job queue based on asynchronous message passing. It can be used as a background task processor for your application in which you dump your tasks to execute in the background or at any given moment. It can be configured to execute your tasks synchronously or asynchronously.
Celery is an asynchronous task queue framework written in Python. Celery makes it easy to execute background tasks but also provides tools for parallel execution and task coordination.
Here's how we can start a worker for our development needs. First, open a new shell or window. In that shell, set up the same Django development environment - activate your virtual environment, or add things to your Python path, whatever you do so that you could use runserver to run your project.
You can either write a custom loader, or use the signals.
Loaders have the on_task_init
method, which is called when a task is about to be executed, and on_worker_init
which is called by the celery+celerybeat main process.
Using signals is probably the easiest, the signals available are:
0.8.x:
task_prerun(task_id, task, args, kwargs)
Dispatched when a task is about to be executed by the worker (or locally if using apply
/or if CELERY_ALWAYS_EAGER
has been set).
task_postrun(task_id, task, args, kwargs, retval)
Dispatched after a task has been executed in the same conditions as above.
task_sent(task_id, task, args, kwargs, eta, taskset)
Called when a task is applied (not good for long-running operations)
Additional signals available in 0.9.x (current master branch on github):
worker_init()
Called when celeryd has started (before the task is initialized, so if on a system supporting fork
, any memory changes would be copied to the child worker processes).
worker_ready()
Called when celeryd is able to receive tasks.
worker_shutdown()
Called when celeryd is shutting down.
Here's an example precalculating something the first time a task is run in the process:
from celery.task import Task from celery.registry import tasks from celery.signals import task_prerun _precalc_table = {} class PowersOfTwo(Task): def run(self, x): if x in _precalc_table: return _precalc_table[x] else: return x ** 2 tasks.register(PowersOfTwo) def _precalc_numbers(**kwargs): if not _precalc_table: # it's empty, so haven't been generated yet for i in range(1024): _precalc_table[i] = i ** 2 # need to use registered instance for sender argument. task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
If you want the function to be run for all tasks, just skip the sender
argument.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With