I'm getting the following error in one of my Celery workers:
2015-07-21T15:02:04.010066+00:00 app[worker.1]: Traceback (most recent call last):
2015-07-21T15:02:04.010069+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/app/trace.py", line 296, in trace_task
2015-07-21T15:02:04.010070+00:00 app[worker.1]: on_chord_part_return(task, state, R)
2015-07-21T15:02:04.010073+00:00 app[worker.1]: deps.delete()
2015-07-21T15:02:04.010074+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/result.py", line 773, in delete
2015-07-21T15:02:04.010071+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 587, in on_chord_part_return
2015-07-21T15:02:04.010078+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 329, in delete_group
2015-07-21T15:02:04.010076+00:00 app[worker.1]: (backend or self.app.backend).delete_group(self.id)
2015-07-21T15:02:04.010079+00:00 app[worker.1]: return self._delete_group(group_id)
2015-07-21T15:02:04.010081+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 499, in _delete_group
2015-07-21T15:02:04.010082+00:00 app[worker.1]: self.delete(self.get_key_for_group(group_id))
2015-07-21T15:02:04.010083+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/redis.py", line 172, in delete
2015-07-21T15:02:04.010084+00:00 app[worker.1]: self.client.delete(key)
2015-07-21T15:02:04.010085+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 824, in delete
2015-07-21T15:02:04.010087+00:00 app[worker.1]: return self.execute_command('DEL', *names)
2015-07-21T15:02:04.010088+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 565, in execute_command
2015-07-21T15:02:04.010089+00:00 app[worker.1]: return self.parse_response(connection, command_name, **options)
2015-07-21T15:02:04.010090+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 579, in parse_response
2015-07-21T15:02:04.010091+00:00 app[worker.1]: return self.response_callbacks[command_name](response, **options)
2015-07-21T15:02:04.010093+00:00 app[worker.1]: ValueError: invalid literal for int() with base 10: 'QUEUED'
What I find weird is that I see no call to int
in the last line of the stack trace. QUEUED probably came in as a worker's status. I'm using it as a custom worker status like this:
@before_task_publish.connect
def update_sent_state(sender=None, body=None, **kwargs):
# the task may not exist if sent using `send_task` which
# sends tasks by name, so fall back to the default result backend
# if that is the case.
task = current_app.tasks.get(sender)
backend = task.backend if task else current_app.backend
logging.debug("Setting status for %s" % body["id"])
backend.store_result(body['id'], None, "QUEUED")
What could be the issue here?
In case it's relevant, here's the code for my task. I only call fetch directly is fetch
.
@app.task
def fetch(url_or_urls, subscribe=None):
"""This fetches a (list of) podcast(s) and stores it in the db. It assumes that it only gets called
by Podcast.get_by_url, or some other method that knows whether a given podcast has
already been fetched.
If *subscribe* is given, it should be a User instance to be subscribed to the given podcasts."""
if isinstance(url_or_urls, basestring):
url_or_urls = [url_or_urls]
body = _store_podcasts.s()
if subscribe:
body.link(_subscribe_user.s(user=subscribe))
return chord([_fetch_podcast_data.s(url) for url in url_or_urls])(body)
@app.task
def _fetch_podcast_data(url):
return do_fetch(url) # This function returns a dict of podcast data.
@app.task
def _store_podcasts(podcasts_data):
"""Given a list of dictionaries representing podcasts, store them all in the database."""
podcasts = [Podcast(**pdata) for pdata in podcasts_data]
return Podcast.objects.insert(podcasts)
@app.task
def _subscribe_user(podcasts, user):
"""Subscribe the given users to all the podcasts in the list."""
return user.subscribe_multi(podcasts)
Is there anything else that could be relevant here?
Library versions as shown by pip freeze
:
redis==2.10.3
celery==3.1.18
Redis is the datastore and message broker between Celery and Django. In other words, Django and Celery use Redis to communicate with each other (instead of a SQL database). Redis can also be used as a cache as well. An alternative for Django & Celery is RabbitMQ (not covered here).
Start three terminals. On first terminal, run redis using redis-server. On second terminal, run celery worker using celery worker -A celery_blog -l info -c 5. By seeing the output, you will be able to tell that celery is running.
Celery belongs to "Message Queue" category of the tech stack, while Redis can be primarily classified under "In-Memory Databases". "Task queue" is the primary reason why developers consider Celery over the competitors, whereas "Performance" was stated as the key factor in picking Redis.
A common message broker that is used with celery is Redis which is a performant, in memory, key-value data store. Specifically, Redis is used to store messages produced by the application code describing the work to be done in the Celery task queue.
It is hard to debug such a bug without working code. Here is what i think it could be. Lets start here:
http://celery.readthedocs.org/en/latest/_modules/celery/backends/base.html#BaseBackend.store_result
def store_result(self, task_id, result, status,
traceback=None, request=None, **kwargs):
"""Update task state and result."""
result = self.encode_result(result, status)
self._store_result(task_id, result, status, traceback,
request=request, **kwargs)
return result
It calls ecnode_result. Lets check that out
def encode_result(self, result, status):
if status in self.EXCEPTION_STATES and isinstance(result, Exception):
return self.prepare_exception(result)
else:
return self.prepare_value(result)
It looks like "status" is expected to be something from predefined STATE constants.
Its code is here
http://celery.readthedocs.org/en/latest/_modules/celery/states.html#state
And docs here
http://celery.readthedocs.org/en/latest/reference/celery.states.html
That does not look like they expect to see something like "QUEUED" there. Try one of the predefined.
The redis python packages expects the response from the DEL
action to always be an integer, which I assume is the count of deleted rows.
The call to int happens in the last line (return self.response_callbacks[command_name](response, **options)
) where self.response_callbacks['DEL']
is equal to int
.
As a workaround, you could subclass the redis.client.StrictRedis
and set the DEL
response callback to something other than int
, just make sure you're familiar with the implications.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With