I have a very simple Flask app example which uses a celery worker to process a task asynchronously:
app.py
app.config['CELERY_BROKER_URL'] = os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379')
app.config['CELERY_RESULT_BACKEND']= os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379')
app.config['SQLALCHEMY_DATABASE_URI'] = conn_str
celery = make_celery(app)
db.init_app(app)
@app.route('/')
def index():
return "Working"
@app.route('/test')
def test():
task = reverse.delay("hello")
return task.id
@celery.task(name='app.reverse')
def reverse(string):
return string[::-1]
if __name__ == "__main__":
app.run()
To run it locally, I run celery -A app.celery worker --loglevel=INFO in one terminal, and python app.py in another terminal.
I'm wondering how can I deploy this application on Google Cloud? I don't want to use Task Queues since it is only compatible with Python 2. Is there a good piece of documentation available for doing something like this? Thanks
App engine task queues is the previous version of Google Cloud Tasks, this has full support for App Engine Flex/STD and Python 3.x runtimes.
You need to create a Cloud Task Queue and an App engine service to handle the tasks
Gcloud command to create a queue
gcloud tasks queues create [QUEUE_ID]
Task handler code
from flask import Flask, request
app = Flask(__name__)
@app.route('/example_task_handler', methods=['POST'])
def example_task_handler():
"""Log the request payload."""
payload = request.get_data(as_text=True) or '(empty payload)'
print('Received task with payload: {}'.format(payload))
return 'Printed task payload: {}'.format(payload)
Code to push a task
"""Create a task for a given queue with an arbitrary payload."""
from google.cloud import tasks_v2
client = tasks_v2.CloudTasksClient()
# replace with your values.
# project = 'my-project-id'
# queue = 'my-appengine-queue'
# location = 'us-central1'
# payload = 'hello'
parent = client.queue_path(project, location, queue)
# Construct the request body.
task = {
'app_engine_http_request': { # Specify the type of request.
'http_method': tasks_v2.HttpMethod.POST,
'relative_uri': '/example_task_handler'
}
}
if payload is not None:
# The API expects a payload of type bytes.
converted_payload = payload.encode()
# Add the payload to the request.
task['app_engine_http_request']['body'] = converted_payload
if in_seconds is not None:
timestamp = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds)
# Add the timestamp to the tasks.
task['schedule_time'] = timestamp
# Use the client to build and send the task.
response = client.create_task(parent=parent, task=task)
print('Created task {}'.format(response.name))
return response
requirements.txt
Flask==1.1.2
gunicorn==20.0.4
google-cloud-tasks==2.0.0
You can check this full example in GCP Python examples Github page
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With