Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Initializing a worker with arguments using Celery

Tags:

python

celery

I'm having issues finding something that seems like it would be relatively simple to me.

I'm using Celery 3.1 with Python 3 and am wanting to initialize my workers with arguments so that they can use these details for setup.

In specific: These workers will be consuming tasks which require interacting with a third-party API using authentication credentials. It's necessary for the worker to pass the authentication details to API server prior to consuming any tasks (authentication details are stored in cookies after the first authentication request).

I would like to pass these login credentials to the worker when it is started from the CLI. I would then like the worker to authenticate using them and store the session for use when consuming future tasks (ideally this would be stored in an attribute that can be accessed from tasks).

Is this possible with Celery?

As a side note, I have considered passing a requests.session object (from the Python requests library) as a task argument but that would require serializing which looks like is frowned upon.

like image 975
Joshua Gilman Avatar asked Nov 21 '14 21:11

Joshua Gilman


People also ask

How do you pass a Celery argument?

To pass arguments to task with apply_async() you need to wrap them in a list and then pass the list as first argument, I.e. apply_async([arg1, arg2, arg3]) . See the documentation for more details and examples. Use delay() as an alternative.

What is Apply_async in Celery?

apply_async(args[, kwargs[, …]]) Sends a task message. delay(*args, **kwargs) Shortcut to send a task message, but doesn't support execution options. calling ( __call__ )

How do you call Celery synchronously?

If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.


1 Answers

I would suggest using an abstract task base class and caching the requests.session.

From the Celery docs:

A task is not instantiated for every request, but is registered in the task registry as a global instance.

This means that the __init__ constructor will only be called once per process, and that the task class is semantically closer to an Actor.

This can also be useful to cache resources...

import requests
from celery import Task

class APITask(Task):
    """API requests task class."""

    abstract = True

    # the cached requests.session object
    _session = None

    def __init__(self):
        # since this class is instantiated once, use this method
        # to initialize and cache resources like a requests.session
        # or use a property like the example below which will create
        # a requests.session only the first time it's accessed

    @property
    def session(self):
        if self._session is None:
            # store the session object for the first time
            session = requests.Session()
            session.auth = ('user', 'pass')

            self._session = session

        return self._session

Now when you create the tasks that will make API requests:

@app.task(base=APITask, bind=True)
def call_api(self, url):
    # self will refer to the task instance (because we're using bind=True)
    self.session.get(url)

Also you can pass the API authentication options using the app.task decorator as an extra argument which will be set on the __dict__ of the task, for example:

# pass a custom auth argument
@app.task(base=APITask, bind=True, auth=('user', 'pass'))
def call_api(self, url):
    pass

And make the base class use the passed authentication options:

class APITask(Task):
    """API requests task class."""

    abstract = True

    # the cached requests.session object
    _session = None

   # the API authentication
   auth = ()

    @property
    def session(self):
        if self._session is None:
            # store the session object for the first time
            session = requests.Session()
            # use the authentication that was passed to the task
            session.auth = self.auth

            self._session = session

        return self._session

You can read more on the Celery docs site:

  • Tasks Instantiation
  • Task Abstract Classes

Now back to your original question which is passing extra arguments to the worker from the command line:

There is a section about this in the Celery docs Adding new command-line options, here's an example of passing a username and a password to the worker from the command line:

$ celery worker -A appname --username user --password pass

The code:

from celery import bootsteps
from celery.bin import Option


app.user_options['worker'].add(
    Option('--username', dest='api_username', default=None, help='API username.')
)

app.user_options['worker'].add(
    Option('--password', dest='api_password', default=None, help='API password.')
)


class CustomArgs(bootsteps.Step):

    def __init__(self, worker, api_username, api_password, **options):
        # store the api authentication
        APITask.auth = (api_username, api_password)


app.steps['worker'].add(CustomArgs)
like image 189
Pierre Avatar answered Oct 13 '22 06:10

Pierre