Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to rollback exceptions within celery when using sqlalchemy + postgresql

This is what my code looks like

import transaction

@app.task(name='task_name')
def task_name_fn(*args, **kwargs):
   with transaction.manager:
       try:
          actual_fn(*args, **kwargs)
          transaction.commit()
       except:
          transaction.abort()

However, my transaction.abort() does not seem to be rolling back. All subsequent celery tasks on this worker fail. I get the following error

This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback().

What am I doing wrong?
Better question still, how would you write the task_name_fn so that this issue does not occur?

like image 879
Ranjith Ramachandra Avatar asked Aug 31 '16 15:08

Ranjith Ramachandra


1 Answers

First thing is that you don't need to catch exceptions to abort the transaction.

import transaction

@app.task(name='task_name')
def task_name_fn(*args, **kwargs):
    with transaction.manager:
        actual_fn(*args, **kwargs)

The transaction will be aborted if an exception occurs.

Next, you could abstract that in the task decorator. Somehing like that (not tested, but probably work as is):

from functools import wraps
import transaction

def tm_task(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        with transaction.manager:
            return f(*args, **kwargs)
    return app.task()(decorated)

@tm_task
def actual_fn(*args, **kwargs):
    pass # your function code here instead of calling other function

Also, since you are using transactions, you may want to delay the queueing of your jobs after the transaction has commited. Because, for example, if you insert a row in your transaction and queue a job to do something with that row, it may arrive in the worker before the first transaction was commited and the row is not available yet outside of the transaction. Something like:

class AfterCommitTask(Task):
    def apply_async(self, *args, **kw):
        tx = transaction.get()
        def hook(status):
            if status: # Only queue if the transaction was succesfull.
                super(AfterCommitTask, self).apply_async(*args, **kw)
        tx.addAfterCommitHook(hook)

def tm_task(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        with transaction.manager:
            return f(*args, **kwargs)
    return app.task(base=AfterCommitTask)(decorated)

@tm_task
def actual_fn(*args, **kwargs):
    pass # your function code here instead of calling other function
like image 167
Antoine Leclair Avatar answered Nov 15 '22 04:11

Antoine Leclair