Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best practice to run Prefect flow serverless in Google Cloud

I have started using Prefect for various projects and now I need to decide which deployment strategy on GCP would work best. Preferably I would like to work serverless. Comparing Cloud Run, Cloud Functions and App Engine, I am inclined to go for the latter since this doesn't have a timeout limit, while the other two have of 9 resp. 15 minutes.

Am interested to hear how people have deployed Prefect flows serverlessly, such that Flows are scheduled/triggered for batch processing, whilst the agent is automatically scaled down when not used.

Alternatively, a more classic approach would be to deploy Prefect on Compute Engine and schedule this via Cloud Scheduler. But I feel this is somewhat outdated and doesn't do justice to the functionality of Prefect and flexibility for future development.

like image 956
dkapitan Avatar asked Apr 24 '20 07:04

dkapitan


People also ask

Which Google Cloud Platform service can be used for serverless processing?

Serverless ComputingCloud Run: Cloud Run (fully-managed) lets you run stateless containers on a fully-managed environment.

Is GCP Cloud run serverless?

Should I deploy on Cloud Run or Cloud Function? Both are GCP serverless products, easy to be deployed and handle HTTP requests/events.

Is Google Cloud function An example of serverless?

Google Cloud Functions is a serverless execution environment for building and connecting cloud services. With Cloud Functions you write simple, single-purpose functions that are attached to events emitted from your cloud infrastructure and services.

What is lambda in GCP?

Serverless computing is a new trend in software development, it is used to build your application by deploying application's functions separately. It reduces lots of steps in designing software architecture and deploying the application.


2 Answers

Am interested to hear how people have deployed Prefect flows serverlessly, such that Flows are scheduled/triggered for batch processing, whilst the agent is automatically scaled down when not used.

Prefect has a blog post on serverless deployment with AWS Lambda which is a good blueprint for doing the same with GCP. The challenge here is the agent scaling - agents work by polling the backend (whether a self deployment of Prefect Server or the hosted Prefect Cloud) on a regular basis (every ~10 secs). One possibility that comes to mind would be to use a Cloud Function to spin up an agent in-process, triggered by whatever batch processing/scheduling event you're thinking of. You can also use the -max-polls CLI argument or kwarg to spin up the agent to look for runs; it'll tear itself down if it doesn't find anything after however many polling attempts you specify. Details on that here or on any of the specific agent pages.

However, this could be inefficient for long-running flows and you might hit resource caps; it might be worthwhile to look at triggering an auto-scaling Dask cluster deployment if the workloads are high enough. Prefect supports that natively with Kubernetes, and has a Kubernetes agent to interact with your cluster. I think this would be the most elegant and scalable solution without having to go the classic Compute Engine route, which I agree is somewhat dated and doesn't provide great auto-scaling or first-class management.

Better support of serverless execution is on the roadmap, specifically a serverless agent is in the works but I don't have an ETA on when that'll be released.

Hopefully that helps! :)

like image 190
nicholas Avatar answered Sep 20 '22 04:09

nicholas


The full explanation is here: https://jerryan.medium.com/hacking-ways-to-run-prefect-flow-serverless-in-google-cloud-function-bc6b249126e4.

Basically, there are two hacking ways to solve the problem.

  • Use google cloud storage to persistent task states automatically
  • Publish previous execution results of a cloud function to its subsequent execution.

Caching and Persisting Data

By default, the Prefect Core stored all data, results, and cached states in memory within the Python process running the flow. However, they can be persisted and retrieved from external locations if necessary hooks are configured.

The Prefect has a notion of “checkpointing” that ensures that every time a task is successfully run, its return value write to persistent storage based on the configuration in a result object and target for the task.

@task(result=LocalResult(dir="~/.prefect"), target="task.txt") 
def func_task():
    return 99

The complete code example is shown below. Here we write to and read from a Google Cloud Bucket by using GCSResult.

import os
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"
from prefect import task, Flow  
from prefect.engine.results import LocalResult, GCSResult

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task1():
    print("Task 1")
    return "Task 1"

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task2():
    print("Task 2")
    return "Task 2"

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task3():
    print("Task 3")
    return "Task 3"

@task(target="{date:%Y-%m-%d}/{task_name}.txt")
def task4():
    print("Task 4")

@task
def task5():
    print("Task 5")

@task
def task6():
    print("Task 6")

@task
def task7():
    print("Task 7")

@task
def task8():
    print("Task 8")
    

# with Flow("This is My First Flow",result=LocalResult(dir="~/prefect")) as flow:
with Flow("this is my first flow", result=GCSResult(bucket="prefect")) as flow:
    t1, t2 = task1(), task2()
    t3 = task3(upstream_tasks=[t1,t2])
    t4 = task4(upstream_tasks=[t3])
    t5 = task5(upstream_tasks=[t4])
    t6 = task6(upstream_tasks=[t4])
    t7 = task7(upstream_tasks=[t2,t6])
    t8 = task8(upstream_tasks=[t2,t3])

# run the whole flow
flow_state = flow.run()

# visualize the flow
flow.visualize(flow_state)

# print the state of the flow
print(flow_state.result)

Publish Execution Results

Another hacking solution is to publish previous execution results of a google cloud function to its subsequent execution. Here, we assume there is no data input and output dependence between tasks.

Some modifications are needed to make it happen.

  • Change custom state handers for tasks
  • Manually change task state before publishing
  • Encode/Decode the task state

First, we know the flow.run function finishes after all the tasks are entered into the finish state, whether it is a success or failure. However, we don’t want all tasks run inside a single call of the google cloud function because the total run time may exceed 540 seconds.

So a custom state hander for the task is used. Every time a task finishes, we emit an ENDRUN signal to the prefect framework. Then it will set the state of the remaining tasks to Cancelled.

from prefect import task, Flow, Task
from prefect.engine.runner import ENDRUN
from prefect.engine.state import State, Cancelled

num_finished = 0

def my_state_handler(obj, old_state, new_state):
    global num_finished
    if num_finished >= 1:
        raise ENDRUN(state=Cancelled("Flow run is cancelled"))

    if new_state.is_finished():  
        num_finished += 1
    return new_state

Second, to make tasks with canceled status execute correctly next time, we must manually change their status to pending.

def run(task_state_dict: Dict[Task, State]) -> Dict[Task, State]:

flow_state = flow.run(task_states=task_state_dict)
task_states = flow_state.result

# change task state before next publish
for t in task_states:
    if isinstance(task_states[t], Cancelled):
        task_states[t] = Pending("Mocked pending")
        

# TODO: reset global counter
global num_finished
num_finished = 0

# task state for next run
return task_states

Third, there are two essential functions: encoding_data and decode_data. The former serialize the task states to be ready be published, and the latter deserialize the task states into flow object.

# encoding: utf-8
from typing import List, Dict, Any
from prefect.engine.state import State
from prefect import Flow, Task


def decode_data(flow: Flow, data: List[Dict[str, Any]]) -> Dict[Task, State]:
    # data as follows:
    # [
    #     {
    #         "task": {
    #               "slug": "task1"
    #          }
    #         "state": {
    #             "type": "Success",
    #             "message": "Task run succeeded(manually set)"
    #         }
    #     }
    # ]

    task_states = {}
    for d in data:

        tasks_found = flow.get_tasks(d['task']['slug'])
        if len(tasks_found) != 1:  # 不唯一就不做处理了
            continue

        state = State.deserialize(
            {"message": d['state']['message'],
             "type": d['state']['type']
             }
        )
        task_states[tasks_found[0]] = state

    return task_states


def encode_data(task_states: Dict[Task, State]) -> List[Dict[str, Any]]:
    data = []
    for task, state in task_states.items():
        data.append({
            "task": task.serialize(),
            "state": state.serialize()
        })
    return data

Last but not least, the orchestration connects all the parts above. def main(data: List[Dict[str, Any]], *args, **kargs) -> List[Dict[str, Any]]: task_states = decode_data(flow, data) task_states = run(task_states) return encode_data(task_states)

if __name__ == "__main__":
    evt = []

    while True:
        data = main(evt)

        states = defaultdict(set)
        for task in data:
            task_type, slug = task['state']['type'], task['task']['slug']
            states[task_type].add(slug)
        if len(states['Pending']) == 0:
            sys.exit(0)

        evt = data
        
        # send pubsub message here
        # GooglePubsub().publish(evt)
        # sys.exit(0)
like image 35
Jerry An Avatar answered Sep 19 '22 04:09

Jerry An