Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to resume a Prefect flow on failure without having to re-run the entire flow?

Tags:

TL;DR;

I wasn't able to use prefect's FlowRunner to solve the above question. I likely either used it wrong (see below) or missed something. Would really appreciate any pointers!


The Problem

I read through the fantastic prefect core documentation and found the sections on Handling Failure and Local Debugging to be the most relevant to this (may have missed something!). The FlowRunner class appeared (to me) to be the solution.

To see if I could use Flow Runner to resume a failed flow:

  • Made a failing flow run:
from time import sleep

import prefect
from prefect import Flow, task


@task
def success():
    sleep(3)
    return


@task
def failure():
    return 1 / 0


def get_flow_runner():
    with Flow("Success/Failure") as flow:

        success()
        failure()

    return prefect.engine.FlowRunner(flow)
  • Ran it in iPython and saved the state:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: state = flow_runner.run()
  • Replaced 1 / 0 with 1 / 1 in failure() so task would be successful:

  • And finally passed the previous state to the flow_runner hoping that it would resume the flow:

In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: flow_runner.run(task_states=state.result)

The entire flow ran again including the 3 second successful task.

like image 999
rdmolony Avatar asked Jul 27 '20 09:07

rdmolony


People also ask

How do you make a prefect flow?

You can create a flow run by calling the flow. For example, by running a Python script or importing the flow into an interactive session. You can also create a flow run by: Creating a deployment on Prefect Cloud or a locally run Prefect Orion server.

What is a task in prefect?

A task is a function that represents a discrete unit of work in a Prefect workflow. Tasks are not required — you may define Prefect workflows that consist only of flows, using regular Python statements and functions.

How do I run a prefect UI?

The Prefect UI is available via Prefect Cloud by logging into your account at https://app.prefect.cloud/. The Prefect UI is also available in any environment where a Prefect Orion server is running with prefect orion start .


1 Answers

The issue here is that you are rebuilding your Flow with each run, which changes the Task objects. state.result is a dictionary whose keys are Task objects - if the underlying Task object changes in any way, so will its hash. You should instead manually create the dictionary of states using the updated Task objects, like so:

from prefect.engine.state import Success

failure_task = runner.flow.get_tasks(name="failure")[0]
task_states = {failure_task: Success("Mocked success")}
like image 98
chriswhite Avatar answered Oct 01 '22 21:10

chriswhite