Im using Celery + Redis with Django Rest API on my localhost to run a classification task how get data from an Axios post. and right now im trying to deploy it to google cloud , and i didnt found a clear way to run Redis and Celery on the App Engine , so i heard about Google Task Queue but i didnt found a way to add it to a view and triger it when the view is called , so how i can create a function to call this google cloud task that i have on celery or just have an idea on how to do it Those are my codes :
from celery import shared_task
from celery_progress.backend import ProgressRecorder
from snakeimage.models import Prediction,UploadedSnake,SnakeClass
from snakeimage.classification_codes.classification_codes.prediction_func import predict_classes
#import json
#import time
#from django.conf import settings
#from google.cloud import tasks_v2beta3
#from google.protobuf import timestamp_pb2
@shared_task(bind=True)
def image_progress(self,image_path, X, Y, metadata,image_id):
progress_recorder = ProgressRecorder(self,)
predictions = predict_classes(image_path, X, Y, metadata)
print(predictions)
for prediction in predictions:
print(prediction[0])
image = UploadedSnake.objects.get(id=image_id)
class_name = SnakeClass.objects.get(index=(prediction[0]+1))
print('>>>>>>>>>>>>>>>>>>>>>',prediction[1])
Prediction.objects.create(image=image,class_name=class_name,predict_percent=prediction[1])
progress_recorder.set_progress( 1, 3, description='Prediction Result Status')
return True
and i call it in the with a :
task = image_progress.delay(image_path=image_path, X=X, Y=Y, metadata=0, image_id=image_id)
Thanks for the help .
[EDIT 1]
sorry for the late reply i could make it work locally with Django-cloud-task but in the staging server its not working , and when i try to connect remotly from my machine to the Google Cloud Task Queue that i created , i get this error and it try to send it for 7 times : googleapiclient.errors.HttpError: <HttpError 403 when requesting https://cloudtasks.googleapis.com/v2beta3/projects/%7Bdeployement-test%7D/locations/europe-west6/queues/default/tasks?alt=json returned "Permission denied on resource project {deployement-test}.". Details: "[{'@type': 'type.googleapis.com/google.rpc.Help', 'links': [{'description': 'Google developer console API key', 'url': 'https://console.developers.google.com/project/{deployement-test}/apiui/credential'}]}, {'@type': 'type.googleapis.com/google.rpc.ErrorInfo', 'reason': 'CONSUMER_INVALID', 'domain': 'googleapis.com', 'metadata': {'consumer': 'projects/{deployement-test}', 'service': 'cloudtasks.googleapis.com'}}]">
i did everything as mentionned in the github , and their is no options to add some api key to authentificate , so did somone know how to resolve this issue , those are my codes : settings :
DJANGO_CLOUD_TASKS_EXECUTE_LOCALLY = False
# If False, running `.execute()` on remote task will simply log the
task data instead of adding it to
# the queue. Useful for debugging. Default: True
DJANGO_CLOUD_TASKS_BLOCK_REMOTE_TASKS = True
PROJECT_NAME = "project"
QUEUE_REGION = "region"
QUEUE_NAME = "queue"
DJANGO_CLOUD_TASKS_HANDLER_SECRET = 'random secret key'
DJANGO_CLOUD_TASKS = {
'project_location_name': 'projects/{project}/locations/region',
'task_handler_root_url': '/_tasks/',
}
task.py:
from celery import shared_task
#from celery_progress.backend import ProgressRecorder
from snakeimage.models import Prediction,UploadedSnake,SnakeClass
from
snakeimage.classification_codes.classification_codes.prediction_func
import predict_classes
#from AINature.settings import DJANGO_HANDLER_SECRET
import json
import time
from django.conf import settings
from google.cloud import tasks_v2beta3
from google.protobuf import timestamp_pb2
from django_cloud_tasks.decorators import task
@task(queue='default')
def example_task(request, p1, p2):
print(p1, p2)
print("lezgooow >>>>>>>>>>>>>>>"+p1)
print(request.task_id)
def prediction_task(request,image_path, X, Y, metadata,image_id):
print("what is going ooon")
#progress_recorder = ProgressRecorder(self, )
predictions = predict_classes(image_path, X, Y, metadata)
print(predictions)
for prediction in predictions:
print(prediction[0])
image = UploadedSnake.objects.get(id=image_id)
class_name = SnakeClass.objects.get(index=(prediction[0] + 1))
print('>>>>>>>>>>>>>>>>>>>>>', prediction[1])
Prediction.objects.create(image=image, class_name=class_name,
predict_percent=prediction1) #progress_recorder.set_progress(1, 3, description='Prediction Result Status') return True
when i put DJANGO_CLOUD_TASKS_EXECUTE_LOCALLY = True , all things run correctly , but when i turn it off , it throw the error that i mentionned . this a link to the Django-cloud-task github : this link
Celery and Cloud Task are both task queue but different implementation, so there is no direct way to convert your Celery to Cloud Task logic. It means it would be easier if you use the Cloud Task Service only. I suggest studying Cloud Task Client libraries before doing the migration. There are samples in GitHub link to get you started.
If you want to still use the Celery you will need to work out how you will trigger the workers via HTTP request because App Engine Standard only accepting HTTP request.
There are other service options you can use, Compute Engine and App Engine Flex where you can implement what kind of setup you want.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With