Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Initializing Different Celery Workers with Different Values

I am using celery to run long running tasks on Hadoop. Each task executes a Pig script on Hadoop which runs for about 30 mins - 2 hours.

My current Hadoop setup has 4 queues a,b,c, and default. All tasks are currently being executed by a single worker which submits the job to a single queue.

I want to add 3 more workers which submit jobs to other queues, one worker per queue.

The problem is the queue is currently hard-coded and I wish to make this variable per worker.

I searched a lot but I am unable to find a way to pass each celery worker a different queue value and access it in my task.

I start my celery worker like so.

celery -A app.celery worker

I wish to pass some additional arguments in the command-line itself and access it in my task but celery complains that it doesn't understand my custom argument.

I plan to run all the workers on the same host by setting the --concurrency=3 parameter. Is there any solution to this problem?

Thanks!

EDIT

The current scenario is like this. Every I try to execute the task print_something by saying tasks.print_something.delay() it only prints queue C.

@celery.task()
def print_something():
    print "C"

I need to have the workers print a variable letter based on what value I pass to them while starting them.

@celery.task()
def print_something():
    print "<Variable Value Per Worker Here>"
like image 933
Pravin Umamaheswaran Avatar asked Jul 07 '17 23:07

Pravin Umamaheswaran


People also ask

Can celery run multiple workers?

The Celery worker then has to wait for every task before it starts execution. This demonstrates how Celery made use of Redis to distribute tasks across multiple workers and to manage the task queue.

What is celery multi start?

Start multiple worker instances from the command-line. Examples. $ # Single worker with explicit name and events enabled. $

How do you call celery synchronously?

If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.

How does celery concurrency work?

As for --concurrency celery by default uses multiprocessing to perform concurrent execution of tasks. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of available CPU's if not set.


1 Answers

What I usually do is, after starting the workers (the tasks are not executed) in another script (say manage.py) I add commands with parameters to start specific tasks or tasks with different arguments.

in manager.py:

from tasks import some_task

@click.command
def run_task(params):
    some_task.apply_async(params)

And this will start the tasks as needed.

like image 81
VKolev Avatar answered Jan 02 '23 18:01

VKolev