I am quite new to Celery and I have been trying to setup a project with 2 separate queues (one to calculate and the other to execute). So far, so good.
My problem is that the workers in the execute queue need to instantiate a class with a unique object_id (one id per worker). I was wondering if I could write a custom worker initialization to initialize the object at start and keep it in memory until the worker is killed.
I found a similar question on custom_task but the proposed solution does not work in my case.
Considering the following toy example:
celery.py
from celery import Celery
app = Celery('proj',
broker='amqp://guest@localhost//',
backend='amqp://',
include=['proj.tasks'])
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=60,
CELERY_ROUTES = {"proj.tasks.add1": {"queue": "q1"}},
)
if __name__ == '__main__':
app.start()
tasks.py
from proj.celery import app
from celery.signals import worker_init
@worker_init.connect(sender='worker1@hostname')
def configure_worker1(*args, **kwargs):
#SETUP id=1 for add1 here???
@worker_init.connect(sender='worker2@hostname')
def configure_worker2(*args, **kwargs):
#SETUP id=2 for add1 here???
@app.task
def add1(y):
return id + y
@app.task
def add(x, y):
return x + y
initializing:
celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info
Is this the right approach? If so, what should I write in the configure_worker1
function in tasks.py
to setup id
at the worker initialization?
Thanks
If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.
The Celery worker then has to wait for every task before it starts execution. This demonstrates how Celery made use of Redis to distribute tasks across multiple workers and to manage the task queue.
The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of available CPU's if not set. So for example --concurrency=5 would use 5 processes meaning 5 tasks that can run concurrently.
I found out the answer by following this http://docs.celeryproject.org/en/latest/userguide/tasks.html#instantiation
The tasks.py looks like this:
from proj.celery import app
from celery import Task
class Task1(Task):
def __init__(self):
self._x = 1.0
class Task2(Task):
def __init__(self):
self._x = 2.0
@app.task(base=Task1)
def add1(y):
return add1._x + y
@app.task(base=Task2)
def add2(y):
return add2._x + y
initializing as before:
celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With