Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How i can convert a Celery Task to Google Cloud Task GCP (Django)

Im using Celery + Redis with Django Rest API on my localhost to run a classification task how get data from an Axios post. and right now im trying to deploy it to google cloud , and i didnt found a clear way to run Redis and Celery on the App Engine , so i heard about Google Task Queue but i didnt found a way to add it to a view and triger it when the view is called , so how i can create a function to call this google cloud task that i have on celery or just have an idea on how to do it Those are my codes :

from celery import shared_task
from celery_progress.backend import ProgressRecorder

from snakeimage.models import Prediction,UploadedSnake,SnakeClass
from snakeimage.classification_codes.classification_codes.prediction_func import predict_classes

#import json
#import time

#from django.conf import settings

#from google.cloud import tasks_v2beta3
#from google.protobuf import timestamp_pb2

@shared_task(bind=True)
 def image_progress(self,image_path, X, Y, metadata,image_id):
 progress_recorder = ProgressRecorder(self,)
 predictions = predict_classes(image_path, X, Y, metadata)
 print(predictions)
   for prediction in predictions:
      print(prediction[0])
      image = UploadedSnake.objects.get(id=image_id)
      class_name = SnakeClass.objects.get(index=(prediction[0]+1))
      print('>>>>>>>>>>>>>>>>>>>>>',prediction[1])
     
 Prediction.objects.create(image=image,class_name=class_name,predict_percent=prediction[1])
 progress_recorder.set_progress( 1, 3, description='Prediction Result Status')
 return True

and i call it in the with a :

task = image_progress.delay(image_path=image_path, X=X, Y=Y, metadata=0, image_id=image_id)

Thanks for the help .

[EDIT 1]

sorry for the late reply i could make it work locally with Django-cloud-task but in the staging server its not working , and when i try to connect remotly from my machine to the Google Cloud Task Queue that i created , i get this error and it try to send it for 7 times : googleapiclient.errors.HttpError: <HttpError 403 when requesting https://cloudtasks.googleapis.com/v2beta3/projects/%7Bdeployement-test%7D/locations/europe-west6/queues/default/tasks?alt=json returned "Permission denied on resource project {deployement-test}.". Details: "[{'@type': 'type.googleapis.com/google.rpc.Help', 'links': [{'description': 'Google developer console API key', 'url': 'https://console.developers.google.com/project/{deployement-test}/apiui/credential'}]}, {'@type': 'type.googleapis.com/google.rpc.ErrorInfo', 'reason': 'CONSUMER_INVALID', 'domain': 'googleapis.com', 'metadata': {'consumer': 'projects/{deployement-test}', 'service': 'cloudtasks.googleapis.com'}}]">

i did everything as mentionned in the github , and their is no options to add some api key to authentificate , so did somone know how to resolve this issue , those are my codes : settings :

 DJANGO_CLOUD_TASKS_EXECUTE_LOCALLY = False
 # If False, running `.execute()` on remote task will simply log the 
 task data instead of adding it to
 # the queue. Useful for debugging. Default: True
 DJANGO_CLOUD_TASKS_BLOCK_REMOTE_TASKS = True

PROJECT_NAME = "project"
QUEUE_REGION = "region"
QUEUE_NAME = "queue"

DJANGO_CLOUD_TASKS_HANDLER_SECRET = 'random secret key'

DJANGO_CLOUD_TASKS = {
                     'project_location_name': 'projects/{project}/locations/region',
                     'task_handler_root_url': '/_tasks/',
                 }

task.py:

from celery import shared_task
#from celery_progress.backend import ProgressRecorder

from snakeimage.models import Prediction,UploadedSnake,SnakeClass
from 
snakeimage.classification_codes.classification_codes.prediction_func 
import predict_classes
#from AINature.settings import DJANGO_HANDLER_SECRET

import json
import time

from django.conf import settings

from google.cloud import tasks_v2beta3
from google.protobuf import timestamp_pb2

from django_cloud_tasks.decorators import task

@task(queue='default')
def example_task(request, p1, p2):
    print(p1, p2)
    print("lezgooow >>>>>>>>>>>>>>>"+p1)
    print(request.task_id)

 def prediction_task(request,image_path, X, Y, metadata,image_id):
    print("what is going ooon")
    #progress_recorder = ProgressRecorder(self, )
    predictions = predict_classes(image_path, X, Y, metadata)
    print(predictions)
    for prediction in predictions:
        print(prediction[0])
        image = UploadedSnake.objects.get(id=image_id)
        class_name = SnakeClass.objects.get(index=(prediction[0] + 1))
        print('>>>>>>>>>>>>>>>>>>>>>', prediction[1])
        Prediction.objects.create(image=image, class_name=class_name, 

predict_percent=prediction1) #progress_recorder.set_progress(1, 3, description='Prediction Result Status') return True

when i put DJANGO_CLOUD_TASKS_EXECUTE_LOCALLY = True , all things run correctly , but when i turn it off , it throw the error that i mentionned . this a link to the Django-cloud-task github : this link

like image 799
Ilyes Negadi Avatar asked Sep 13 '25 21:09

Ilyes Negadi


1 Answers

Celery and Cloud Task are both task queue but different implementation, so there is no direct way to convert your Celery to Cloud Task logic. It means it would be easier if you use the Cloud Task Service only. I suggest studying Cloud Task Client libraries before doing the migration. There are samples in GitHub link to get you started.

If you want to still use the Celery you will need to work out how you will trigger the workers via HTTP request because App Engine Standard only accepting HTTP request.

There are other service options you can use, Compute Engine and App Engine Flex where you can implement what kind of setup you want.

like image 84
JM Gelilio Avatar answered Sep 16 '25 03:09

JM Gelilio