I want to use Celery to run jobs on a GPU server with four Tesla cards. I run the Celery worker with a pool of four workers such that each card always runs one job.
My problem is how to instruct the workers to each claim one GPU. Currently I rely on the assumption that the worker processes should all have contiguous process IDs:
device_id = os.getpid() % self.ndevices
However, I this is not guaranteed to always work, i.e. when worker processes get restarted over time. So ideally, I would like to get the ID of each worker directly. Can someone tell me if it is possible to inspect the worker from within a task or can suggest a different solution to distribute the jobs across the GPUs?
You can access the identifier of the executing task object via app.Task.request.id . What is this? To get the currently executing task, you can run current_task. request from the current_task module after importing it with from celery import current_task .
When you run a celery worker, it creates one parent process to manage the running tasks. This process handles the book keeping features like sending/receiving queue messages, registering tasks, killing hung tasks, tracking status, etc.
The "shared_task" decorator allows creation of Celery tasks for reusable apps as it doesn't need the instance of the Celery app. It is also easier way to define a task as you don't need to import the Celery app instance.
If you are using CELERYD_POOL = 'processes'
, the worker pool is handled by billiard
, which does happen to expose its 0-based process index:
from billiard import current_process
from celery import task
@task
def print_info():
# This will print an int in [0..concurrency[
print current_process().index
The index
is 0-based, and if a worker happens to be restarted it will keep its index.
I couldn't find any documentation regarding the index
value though :/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With