Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Automatically register new prefect flows?

Is there a mechanism to automatically register flows/new flows if a local agent is running, without having to manually run e.g. flow.register(...) on each one?

In airflow, I believe they have a process that regularly scans for any files with dag in the name in the specified airflow home folder, then searches them for DAG objects. And if it finds them it loads them so they are accessible through the UI without having to manually 'register' them.

Does something similar exist for prefect. So for example if I just created the following file test_flow.py, without necessarily running it or adding flow.run_agent() is there a way for it to just be magically registered and accessible through the UI :) - just by it simply existing in the proper place?

# prefect_home_folder/test_flow.py
import prefect
from prefect import task, Flow

@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello, Cloud!")

flow = Flow("hello-flow", tasks=[hello_task])

flow.register(project_name='main')

I could write a script that has similar behavior to the airflow process to scan a folder and register flows at regular intervals, but I wonder if it's a bit hacky or if there is a better solution and I'm justing thinking too much in terms of airflow?

like image 225
evariste galois Avatar asked Nov 23 '20 17:11

evariste galois


People also ask

How do I register a prefect flow?

Register a Flow In order for your flow to be managed by a Prefect backend (either Prefect Cloud or Prefect Core server) it must first be registered. The easiest way to register a created flow is to call flow. register with the name of the project you wish to register it under. Copied!

What prefect flows?

Prefect treats flows as functions, which means they can be run at any time, with any concurrency, for any reason. However, flows may also have schedules. In Prefect terms, a schedule is nothing more than a way to indicate that you want to start a new run at a specific time.


1 Answers

Great question (and awesome username!) - in short, I suggest you are thinking too much in terms of Airflow. There are a few reasons this is not currently available in Prefect:

  • explicit is better than implicit
  • Prefect flows are not constrained to live in one place and are not constrained to have the same runtime environments; this makes both the automatic discovery of a flow + re-serializing it complicated from a single agent process (which is not required to share the same runtime environment as the flows it submits)
  • agents are better thought of as being parametrized by deployment infrastructure, not flow storage

Ideally for production workflows you'd use a CI/CD process so that anytime you make a code change an automatic job is triggered that re-registers the flow. A few comments that may be helpful:

  • you don't actually need to re-register the flow for every possible code change; for example, if you changed the message that your hello_task logs in your example, you could simply re-save the flow to its original location (what this looks like depends on the type of storage you use). Ultimately you only need to re-register if any of the metadata about your flow changes (retry settings, task names, dependency relationships, etc.)
  • you can use flow.register("My Project", idempotency_key=flow.serialized_hash()) to automatically capture this; this pattern will only register a new version if the flow's backend representation changes in some way
like image 113
chriswhite Avatar answered Oct 19 '22 07:10

chriswhite