Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Weird error with Redis and Celery

I'm getting the following error in one of my Celery workers:

2015-07-21T15:02:04.010066+00:00 app[worker.1]: Traceback (most recent call last):
2015-07-21T15:02:04.010069+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/app/trace.py", line 296, in trace_task
2015-07-21T15:02:04.010070+00:00 app[worker.1]:     on_chord_part_return(task, state, R)
2015-07-21T15:02:04.010073+00:00 app[worker.1]:     deps.delete()
2015-07-21T15:02:04.010074+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/result.py", line 773, in delete
2015-07-21T15:02:04.010071+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 587, in on_chord_part_return
2015-07-21T15:02:04.010078+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 329, in delete_group
2015-07-21T15:02:04.010076+00:00 app[worker.1]:     (backend or self.app.backend).delete_group(self.id)
2015-07-21T15:02:04.010079+00:00 app[worker.1]:     return self._delete_group(group_id)
2015-07-21T15:02:04.010081+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 499, in _delete_group
2015-07-21T15:02:04.010082+00:00 app[worker.1]:     self.delete(self.get_key_for_group(group_id))
2015-07-21T15:02:04.010083+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/redis.py", line 172, in delete
2015-07-21T15:02:04.010084+00:00 app[worker.1]:     self.client.delete(key)
2015-07-21T15:02:04.010085+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 824, in delete
2015-07-21T15:02:04.010087+00:00 app[worker.1]:     return self.execute_command('DEL', *names)
2015-07-21T15:02:04.010088+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 565, in execute_command
2015-07-21T15:02:04.010089+00:00 app[worker.1]:     return self.parse_response(connection, command_name, **options)
2015-07-21T15:02:04.010090+00:00 app[worker.1]:   File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 579, in parse_response
2015-07-21T15:02:04.010091+00:00 app[worker.1]:     return self.response_callbacks[command_name](response, **options)
2015-07-21T15:02:04.010093+00:00 app[worker.1]: ValueError: invalid literal for int() with base 10: 'QUEUED'

What I find weird is that I see no call to int in the last line of the stack trace. QUEUED probably came in as a worker's status. I'm using it as a custom worker status like this:

@before_task_publish.connect
def update_sent_state(sender=None, body=None, **kwargs):
    # the task may not exist if sent using `send_task` which
    # sends tasks by name, so fall back to the default result backend
    # if that is the case.
    task = current_app.tasks.get(sender)
    backend = task.backend if task else current_app.backend
    logging.debug("Setting status for %s" % body["id"])

    backend.store_result(body['id'], None, "QUEUED")

What could be the issue here?


In case it's relevant, here's the code for my task. I only call fetch directly is fetch.

@app.task
def fetch(url_or_urls, subscribe=None):
    """This fetches a (list of) podcast(s) and stores it in the db. It assumes that it only gets called
    by Podcast.get_by_url, or some other method that knows whether a given podcast has
    already been fetched.

    If *subscribe* is given, it should be a User instance to be subscribed to the given podcasts."""
    if isinstance(url_or_urls, basestring):
        url_or_urls = [url_or_urls]
    body = _store_podcasts.s()
    if subscribe:
        body.link(_subscribe_user.s(user=subscribe))
    return chord([_fetch_podcast_data.s(url) for url in url_or_urls])(body)

@app.task
def _fetch_podcast_data(url):
    return do_fetch(url) # This function returns a dict of podcast data.

@app.task
def _store_podcasts(podcasts_data):
    """Given a list of dictionaries representing podcasts, store them all in the database."""
    podcasts = [Podcast(**pdata) for pdata in podcasts_data]
    return Podcast.objects.insert(podcasts)

@app.task
def _subscribe_user(podcasts, user):
    """Subscribe the given users to all the podcasts in the list."""
    return user.subscribe_multi(podcasts)

Is there anything else that could be relevant here?


Library versions as shown by pip freeze:

redis==2.10.3
celery==3.1.18
like image 762
bigblind Avatar asked Jul 21 '15 15:07

bigblind


People also ask

How celery works with Redis?

Redis is the datastore and message broker between Celery and Django. In other words, Django and Celery use Redis to communicate with each other (instead of a SQL database). Redis can also be used as a cache as well. An alternative for Django & Celery is RabbitMQ (not covered here).

How do you check if celery is connected to Redis?

Start three terminals. On first terminal, run redis using redis-server. On second terminal, run celery worker using celery worker -A celery_blog -l info -c 5. By seeing the output, you will be able to tell that celery is running.

What is the difference between celery and Redis?

Celery belongs to "Message Queue" category of the tech stack, while Redis can be primarily classified under "In-Memory Databases". "Task queue" is the primary reason why developers consider Celery over the competitors, whereas "Performance" was stated as the key factor in picking Redis.

Does celery need Redis?

A common message broker that is used with celery is Redis which is a performant, in memory, key-value data store. Specifically, Redis is used to store messages produced by the application code describing the work to be done in the Celery task queue.


2 Answers

It is hard to debug such a bug without working code. Here is what i think it could be. Lets start here:

http://celery.readthedocs.org/en/latest/_modules/celery/backends/base.html#BaseBackend.store_result

def store_result(self, task_id, result, status,
                 traceback=None, request=None, **kwargs):
    """Update task state and result."""
    result = self.encode_result(result, status)
    self._store_result(task_id, result, status, traceback,
                       request=request, **kwargs)
    return result

It calls ecnode_result. Lets check that out

  def encode_result(self, result, status):
        if status in self.EXCEPTION_STATES and isinstance(result, Exception):
            return self.prepare_exception(result)
        else:
            return self.prepare_value(result)

It looks like "status" is expected to be something from predefined STATE constants.

Its code is here

http://celery.readthedocs.org/en/latest/_modules/celery/states.html#state

And docs here

http://celery.readthedocs.org/en/latest/reference/celery.states.html

That does not look like they expect to see something like "QUEUED" there. Try one of the predefined.

like image 60
singer Avatar answered Sep 28 '22 23:09

singer


The redis python packages expects the response from the DEL action to always be an integer, which I assume is the count of deleted rows.

The call to int happens in the last line (return self.response_callbacks[command_name](response, **options)) where self.response_callbacks['DEL'] is equal to int.

As a workaround, you could subclass the redis.client.StrictRedis and set the DEL response callback to something other than int, just make sure you're familiar with the implications.

like image 29
garnertb Avatar answered Sep 29 '22 00:09

garnertb