Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting all task IDs from nested chains and chords

I'm using Celery 3.1.9 with a Redis backend. The job that I'm running is made of several subtasks which run in chords and chains. The structure looks like this:

  1. prepare
  2. download data (a chord of 2 workers)
  3. parse and store downloaded data
  4. long-running chord of 4 workers
  5. finalize
  6. generate report

Each item in the list is a subtask, they are all chained together. Steps 2 and 4 are chords. The whole thing is wired up by creating a chord for step 4 whose callback is a chain of 4 -> 6, then a chord is created for step 2, whose callback is 3 -> first chord. Then, finally a chain is created 1 -> second chord. This chain is then started with delay() and its ID is stored in the database.

The problem is two-fold. First I want to be able to revoke the whole thing and second I want to have a custom on_failure on my Task class that does some cleanup, and reports the failure to the user.

Currently I store the chain's task ID. I thought I could use this to revoke the chain. Also, in case of an error I wanted to walk the chain to its root (in the on_failure handler) to retrieve the relevant record from the database. This doesn't work, because when you re-create an instance of AsyncResult with just the ID of the task, its parent attribute is None.

The second thing I tried was to store the result of serializable() called on the outer chain's result. This however, does not return the entire tree of AsyncResult objects, it just returns the IDs of the first level in the chain (so not the IDs of the children in the chords.)

The third thing I tried was to implement serializable() myself, but as it turns out, the reason why the original method doesn't go further than 2 levels is because the chain's children are celery.canvas.chord objects, instead of AsyncResult instances.

An illustration of the problem:

chord([
    foo.si(),
    foo.si(),
    foo.si(),
], bar.si() | bar.si())
res = chord.apply_async()
pprint(res.serializable())

Prints the following:

(('50c9eb94-7a63-49dc-b491-6fce5fed3713',
  ('d95a82b7-c107-4a2c-81eb-296dc3fb88c3',
   [(('7c72310b-afc7-4010-9de4-e64cd9d30281', None), None),
    (('2cb80041-ff29-45fe-b40c-2781b17e59dd', None), None),
    (('e85ab83d-dd44-44b5-b79a-2bbf83c4332f', None), None)])),
 None)

The first ID is the ID of the callback chain, the second ID is from the chord task itself, and the last three are the actual tasks inside the chord. But I can't get at the result from the task inside the callback chain (i.e. the ID of the two bar.si() calls).

Is there any way to get at the actual task IDs?

like image 637
Blubber Avatar asked Apr 10 '14 14:04

Blubber


1 Answers

One hacky way is calling the tasks with apply_async, save the task ids and wait for them manually. In this way you will have complete control of happens but you should only wait for async tasks as last resort. Now you can access task id, return value, etc. For example something like this:

 task1 = a_task.apply_async()  task2 = b_task.apply_async()  task3 = c_task.apply_async()   tasks = [task1, task2, task3]   for task in tasks:      task.wait() 
like image 92
marcos Avatar answered Sep 28 '22 02:09

marcos