I am trying to retry a celery task if it fails. This is my task,
@shared_task(queue='es', bind=True, max_retries=3)
def update_station_es_index(doc_id, doc_body):
"""
Celery consumer method for updating ES.
"""
try:
#es_client is connecting to ElasticSearch
es_client.update_stations(id=doc_id, body=doc_body)
except Exception as e:
self.retry(exc=e)
But I get this error when this task is called,
TypeError: update_station_es_index() takes 2 positional arguments but 3 were given
I have not found enough help on the web for this error, just this github issue, but that does not explain much.
Can anyone tell me what is happening and what is the solution here?
Using Django2.0.5 and celery4.2.0
You must add self
as an argument. When you specify bind=True
, the task itself will be passed as the first argument.
Suppose you have a standard task add
which takes two arguments
@shared_task
def add(x, y):
return x + y
If you specify bind=True
on this task, it will need to accept another argument.
@shared_task(bind=True)
def add(self, x, y):
# ...
So change
def update_station_es_index(doc_id, doc_body):
to
def update_station_es_index(self, doc_id, doc_body):
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With