I am using celery to run long running tasks on Hadoop. Each task executes a Pig script on Hadoop which runs for about 30 mins - 2 hours.
My current Hadoop setup has 4 queues a,b,c, and default. All tasks are currently being executed by a single worker which submits the job to a single queue.
I want to add 3 more workers which submit jobs to other queues, one worker per queue.
The problem is the queue is currently hard-coded and I wish to make this variable per worker.
I searched a lot but I am unable to find a way to pass each celery worker a different queue value and access it in my task.
I start my celery worker like so.
celery -A app.celery worker
I wish to pass some additional arguments in the command-line itself and access it in my task but celery complains that it doesn't understand my custom argument.
I plan to run all the workers on the same host by setting the --concurrency=3
parameter. Is there any solution to this problem?
Thanks!
EDIT
The current scenario is like this. Every I try to execute the task print_something by saying tasks.print_something.delay()
it only prints queue C.
@celery.task()
def print_something():
print "C"
I need to have the workers print a variable letter based on what value I pass to them while starting them.
@celery.task()
def print_something():
print "<Variable Value Per Worker Here>"
The Celery worker then has to wait for every task before it starts execution. This demonstrates how Celery made use of Redis to distribute tasks across multiple workers and to manage the task queue.
Start multiple worker instances from the command-line. Examples. $ # Single worker with explicit name and events enabled. $
If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.
As for --concurrency celery by default uses multiprocessing to perform concurrent execution of tasks. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of available CPU's if not set.
What I usually do is, after starting the workers (the tasks are not executed) in another script (say manage.py) I add commands with parameters to start specific tasks or tasks with different arguments.
in manager.py:
from tasks import some_task
@click.command
def run_task(params):
some_task.apply_async(params)
And this will start the tasks as needed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With