I set an environment variable in supervisord:
[program:worker]
directory = /srv/app/
command=celery -A tasks worker -Q default -l info -n default_worker.%%h
environment=BROKER="amqp://admin:password@xxxxx:5672//"
Within my celeryconfig.py I then try to read that variable like this.
BROKER = os.environ['BROKER']
But I still get the key the error below, why?
File "/usr/local/lib/python2.7/dist-packages/celery/loaders/base.py", line 106, in import_module
return importlib.import_module(module, package=package)
File "/usr/lib/python2.7/importlib/__init__.py", line 37, in import_module
__import__(name)
File "/srv/app/celeryconfig.py", line 6, in <module>
BROKER = os.environ['BROKER']
File "/usr/lib/python2.7/UserDict.py", line 23, in __getitem__
raise KeyError(key)
KeyError: 'BROKER
There is a file dump of the envs as suggested in the comments:
{
'SUPERVISOR_GROUP_NAME': 'celery_default_worker',
'TERM': 'linux',
'SUPERVISOR_SERVER_URL': 'unix: ///var/run/supervisor.sock',
'UPSTART_INSTANCE': '',
'RUNLEVEL': '2',
'UPSTART_EVENTS': 'runlevel',
'PREVLEVEL': 'N',
'SUPERVISOR_PROCESS_NAME': 'celery_default_worker',
'UPSTART_JOB': 'rc',
'PWD': '/',
'SUPERVISOR_ENABLED': '1',
'runlevel': '2',
'PATH': '/usr/local/sbin: /usr/local/bin: /sbin: /bin: /usr/sbin: /usr/bin',
'previous': 'N'
}
It looks like a known bug in supervisord
:
http://github.com/Supervisor/supervisor/issues/91 (kindof resolved)
http://github.com/Supervisor/supervisor/pull/550 (pending)
In that case, moving your environment spec to global scope (for supervisord process itself) may be an acceptable workaround.
Finally, if all else fails, wrap celery
in a shell script that accepts this specific environment variable as command line argument.
This answer is most likely not the cause, check https://stackoverflow.com/a/28829162/1589147 for information on a related supervisord bug instead.
I can reproduce your error partially. I do not see the error when celery runs within supervisor. I see the error when I try to run the task from an environment outside supervisor where I did not set the BROKER
environment variable. celeryconfig.py
is executed both by celery and by anything which tries to execute a task.
I am not certain if this issue is exactly what you have come across, if you could share how you are executing the tasks and when that exception is raised it may help.
For example, if I try to run the task from ipython
an error is generated which matches your error.
In [1]: from tasks import add
In [2]: add.delay(2,3)
...
21 if hasattr(self.__class__, "__missing__"):
22 return self.__class__.__missing__(self, key)
---> 23 raise KeyError(key)
24 def __setitem__(self, key, item): self.data[key] = item
25 def __delitem__(self, key): del self.data[key]
KeyError: 'BROKER'
The celeryconfig.py
is loaded locally in order to establish a connection to the celery broker and backend. I am unable to execute the task without setting the BROKER
environment variable.
If I set the environment variable before executing my task the same code works for me.
In [3]: import os
In [4]: os.environ["BROKER"] = "broker is set"
In [5]: add.delay(2,3)
Out[5]: <AsyncResult: 0f3xxxx-87fa-48d7-9258-173bdd2052ca>
Here are the files I used in case it helps.
supervisor.conf
: supervisord -c supervisor.conf
[unix_http_server]
file=/tmp/supervisor.sock
[supervisord]
loglevel = info
nodaemon = true
identifier = supervisor
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[program:worker]
command=/app/srv/main-env/bin/celery -A tasks worker -Q default -l info -n default_worker.%%h
environment=BROKER="amqp://admin:password@xxxxx:5672//"
directory=/app/srv/
numprocs=1
stdout_logfile=/app/srv/worker.log
stderr_logfile=/app/srv/worker.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs = 600
killasgroup=true
priority=998
celeryconfig.py
:
import os
BROKER = os.environ['BROKER']
tasks.py
:
from celery import Celery
app = Celery(
'tasks',
backend='amqp',
broker='amqp://admin:password@xxxxx:5672//')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With