Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

celery nested chords not calling callback in the right order

I am trying to call a method which creates a chord and use that chord in another chord that I build in another method. But I am not able to get it call the callback in the right order.

#celery obj
app = Celery('tasks', backend=BACKEND, broker=BROKER)

Here are the simple methods that I use to test this:

@app.task
def first_task(args):
    return 'first_task'


@app.task
def first_body(args):
    return 'first_body'


@app.task
def second_body(args):
    return 'second_body'


@app.task
def third_body(args):
    return 'third_body'

Here is how I create my chords:

@app.task
def simple_chord(args):
    c = chord(group([first_task.s(args)]), body=first_body.s())
    return c()

# this works as expected 
# first_task -> first_body -> second_body -> third_body
def test_chords():
    c = chord(group([chord(group([chord(group([first_task.s('foo')]),
                                        body=first_body.s())]),
                           body=second_body.s())]),
              body=third_body.s())

    c.delay()

# this does not work as expected
# first_task -> second_body -> first_body -> third_body
def test_chords_2():
    c = chord(group([chord(group([simple_chord.s('foo')]),
                           body=second_body.s())]),
              body=third_body.s())

    c.delay()

When I build the chords in one place as in test_chords it does what I expect it to do (first_task -> first_body -> second_body -> third_body). However I want to be able to get the chord I build in simple_chord method and use it in another chord as in test_chords_2 but it does not wait for the first_body to be executed before executing second_body. It executes in this order instead: first_task -> second_body -> first_body -> third_body

What I see in the worker console:

[2018-06-25 16:55:21,466: INFO/MainProcess] Received task: tasks.simple_chord[c78eb259-e1e1-4b4d-b053-247afce4d536]  
[2018-06-25 16:55:21,660: INFO/MainProcess] Received task: tasks.first_task[fca138d2-c672-4299-a2ed-1bb5cacb66da]  
[2018-06-25 16:55:21,685: INFO/ForkPoolWorker-1] Task tasks.simple_chord[c78eb259-e1e1-4b4d-b053-247afce4d536] succeeded in 0.19740361299773213s: <AsyncResult: c4db6bbf-d150-44db-a34e-99ea6bd71012>
[2018-06-25 16:55:21,688: INFO/MainProcess] Received task: tasks.second_body[afc32047-3239-4118-bb2c-c116f9a241bf]  
[2018-06-25 16:55:23,340: INFO/ForkPoolWorker-1] Task tasks.first_task[fca138d2-c672-4299-a2ed-1bb5cacb66da] succeeded in 1.639255696994951s: 'first_task'
[2018-06-25 16:55:23,344: INFO/MainProcess] Received task: tasks.first_body[c4db6bbf-d150-44db-a34e-99ea6bd71012]  
[2018-06-25 16:55:24,099: INFO/ForkPoolWorker-1] Task tasks.second_body[afc32047-3239-4118-bb2c-c116f9a241bf] succeeded in 0.7374479610007256s: 'second_body'
[2018-06-25 16:55:24,103: INFO/MainProcess] Received task: tasks.third_body[9a90c2da-cd03-4f55-9675-9194acd7d42f]  
[2018-06-25 16:55:24,656: INFO/ForkPoolWorker-1] Task tasks.first_body[c4db6bbf-d150-44db-a34e-99ea6bd71012] succeeded in 0.5298690330091631s: 'first_body'
[2018-06-25 16:55:25,247: INFO/ForkPoolWorker-1] Task tasks.third_body[9a90c2da-cd03-4f55-9675-9194acd7d42f] succeeded in 0.5770523219980532s: 'third_body'

My question is what would be the best way to do something like this?

like image 466
Hakan Değirmen Avatar asked Oct 16 '25 18:10

Hakan Değirmen


1 Answers

Solved the issue by calling the simple_chord normally (not as a signature) in test_chords_2. I was making it run asynchronous unnecessarily:

def test_chords_2():
c = chord(group([chord(group([simple_chord('foo')]),
                       body=second_body.s())]),
          body=third_body.s())

print(c)
c.delay()
like image 113
Hakan Değirmen Avatar answered Oct 19 '25 07:10

Hakan Değirmen



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!