Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I set up Celery to call a custom worker initialization?

Tags:

python

celery

I am quite new to Celery and I have been trying to setup a project with 2 separate queues (one to calculate and the other to execute). So far, so good.

My problem is that the workers in the execute queue need to instantiate a class with a unique object_id (one id per worker). I was wondering if I could write a custom worker initialization to initialize the object at start and keep it in memory until the worker is killed.

I found a similar question on custom_task but the proposed solution does not work in my case.

Considering the following toy example:

celery.py

from celery import Celery

app = Celery('proj',
             broker='amqp://guest@localhost//',
             backend='amqp://',
             include=['proj.tasks'])

app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=60,
    CELERY_ROUTES = {"proj.tasks.add1": {"queue": "q1"}},
)

if __name__ == '__main__':
    app.start()

tasks.py

from proj.celery import app
from celery.signals import worker_init

@worker_init.connect(sender='worker1@hostname')
def configure_worker1(*args, **kwargs):
    #SETUP id=1 for add1 here???

@worker_init.connect(sender='worker2@hostname')
def configure_worker2(*args, **kwargs):
    #SETUP id=2 for add1 here???

@app.task
def add1(y):
    return id + y

@app.task
def add(x, y):
    return x + y

initializing:

celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info

Is this the right approach? If so, what should I write in the configure_worker1 function in tasks.py to setup id at the worker initialization?

Thanks

like image 375
b3rt0 Avatar asked Nov 02 '14 20:11

b3rt0


People also ask

How do you call Celery synchronously?

If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.

Can Celery run multiple workers?

The Celery worker then has to wait for every task before it starts execution. This demonstrates how Celery made use of Redis to distribute tasks across multiple workers and to manage the task queue.

How do you set the number of workers in Celery?

The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of available CPU's if not set. So for example --concurrency=5 would use 5 processes meaning 5 tasks that can run concurrently.


1 Answers

I found out the answer by following this http://docs.celeryproject.org/en/latest/userguide/tasks.html#instantiation

The tasks.py looks like this:

from proj.celery import app
from celery import Task

class Task1(Task):
    def __init__(self):
        self._x = 1.0

class Task2(Task):
    def __init__(self):
        self._x = 2.0

@app.task(base=Task1)
def add1(y):
    return add1._x + y

@app.task(base=Task2)
def add2(y):
    return add2._x + y

initializing as before:

celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info
like image 108
b3rt0 Avatar answered Oct 05 '22 05:10

b3rt0