I wasn't able to use prefect's FlowRunner to solve the above question. I likely either used it wrong (see below) or missed something. Would really appreciate any pointers!
I read through the fantastic prefect core documentation and found the sections on Handling Failure and Local Debugging to be the most relevant to this (may have missed something!). The FlowRunner class appeared (to me) to be the solution.
To see if I could use Flow Runner to resume a failed flow:
from time import sleep
import prefect
from prefect import Flow, task
@task
def success():
sleep(3)
return
@task
def failure():
return 1 / 0
def get_flow_runner():
with Flow("Success/Failure") as flow:
success()
failure()
return prefect.engine.FlowRunner(flow)
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: state = flow_runner.run()
Replaced 1 / 0 with 1 / 1 in failure()
so task would be successful:
And finally passed the previous state to the flow_runner
hoping that it would resume the flow:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: flow_runner.run(task_states=state.result)
The entire flow ran again including the 3 second successful task.
You can create a flow run by calling the flow. For example, by running a Python script or importing the flow into an interactive session. You can also create a flow run by: Creating a deployment on Prefect Cloud or a locally run Prefect Orion server.
A task is a function that represents a discrete unit of work in a Prefect workflow. Tasks are not required — you may define Prefect workflows that consist only of flows, using regular Python statements and functions.
The Prefect UI is available via Prefect Cloud by logging into your account at https://app.prefect.cloud/. The Prefect UI is also available in any environment where a Prefect Orion server is running with prefect orion start .
The issue here is that you are rebuilding your Flow with each run, which changes the Task objects. state.result
is a dictionary whose keys are Task objects - if the underlying Task object changes in any way, so will its hash. You should instead manually create the dictionary of states using the updated Task objects, like so:
from prefect.engine.state import Success
failure_task = runner.flow.get_tasks(name="failure")[0]
task_states = {failure_task: Success("Mocked success")}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With