Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery + SQS is receiving same task twice, with same task id at same time

using celery with SQS in flask app
but celery is receiving same task twice with same task id at same time,

running worker like this,
celery worker -A app.jobs.run -l info --pidfile=/var/run/celery/celery.pid --logfile=/var/log/celery/celery.log --time-limit=7200 --concurrency=8

here are the logs of celery

[2019-11-29 08:07:35,464: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]  
[2019-11-29 08:07:35,465: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]  
[2019-11-29 08:07:35,471: WARNING/ForkPoolWorker-4] in booking funtion1
[2019-11-29 08:07:35,473: WARNING/ForkPoolWorker-3] in booking funtion1
[2019-11-29 08:07:35,537: WARNING/ForkPoolWorker-3] book_request_pp
[2019-11-29 08:07:35,543: WARNING/ForkPoolWorker-4] book_request_pp

same task received twice and both are running simultaneously,

using celery==4.4.0rc4 , boto3==1.9.232, kombu==4.6.6 with SQS in python flask.
In SQS, Default Visibility Timeout is 30 minutes, and my task is not having ETA and not ack

my task.py

from app import app as flask_app
from app.jobs.run import capp
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(flask_app)

class BookingTasks:
    def addBookingToTask(self):
        request_data = request.json
        print ('in addBookingToTask',request_data['request_id'])
        print (request_data)
        bookFlightTask.delay(request_data)
        return 'addBookingToTask added'

@capp.task(max_retries=0)
def bookFlightTask(request_data):
    task_id = capp.current_task.request.id
    try:
        print ('in booking funtion1')
        ----

my config file, config.py

import os
from urllib.parse import quote_plus

aws_access_key = quote_plus(os.getenv('AWS_ACCESS_KEY'))
aws_secret_key = quote_plus(os.getenv('AWS_SECRET_KEY'))

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
imports = ('app.jobs.run',)


## Using the database to store task state and results.
result_backend = 'db' + '+' + os.getenv('SQLALCHEMY_DATABASE_URI')

and lastly my celery app file, run.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from flask import Flask
from app import app as flask_app
import sqlalchemy
capp = Celery()

capp.config_from_object('app.jobs.config')

# Optional configuration, see the capplication user guide.
capp.conf.update(
    result_expires=3600,
)
 
# SQS_QUEUE_NAME is like 'celery_test.fifo' , .fifo is required
capp.conf.task_default_queue = os.getenv('FLIGHT_BOOKINNG_SQS_QUEUE_NAME')
if __name__ == '__main__':
    capp.start()
like image 658
ssnitish Avatar asked Dec 01 '19 07:12

ssnitish


1 Answers

The default SQS visiblity_timeout is 30s. You need to update the celery config value: broker_transport_options={'visibility_timeout': 3600}.

When celery goes to create the queue it will set the visibility timeout to 1h.

NOTE: If you specify the task_default_queue, and the queue has already been created without specifying broker_transport_options={'visibility_timeout': 3600}, celery will not update the visibility timeout when restarted with broker_transport_options={'visibility_timeout': 3600}. You will need to delete the queue and have celery recreate it.

like image 111
Aaron J Avatar answered Oct 26 '22 05:10

Aaron J