Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Prefect how to avoid rerunning a task

In Prefect, suppose I have some pipeline which runs f(date) for every date in a list, and saves it to a file. This is a pretty common ETL operation. In airflow, if I run this once, it will backfill for all historical dates. If I run it again, it will know that the task has been run, and only run any new tasks that have appeared (ie latest date).

In Prefect, to my knowledge, it will run the entire pipeline every day, even if 99% of the tasks were completed the day before. What are some solutions to dealing with this, without switching to Prefect Cloud? Do you just do something like make every task cache it's completion in redis before exiting?

like image 499
Nezo Avatar asked Aug 14 '20 19:08

Nezo


People also ask

What is a prefect task?

In simple terms, a Prefect Task represents an individual unit of work. For example, all of the following could qualify as a Prefect Task: querying a database. formatting a string. spinning up a Spark cluster.

What is prefect flow?

Prefect treats flows as functions, which means they can be run at any time, with any concurrency, for any reason. However, flows may also have schedules. In Prefect terms, a schedule is nothing more than a way to indicate that you want to start a new run at a specific time.


Video Answer


1 Answers

Prefect has many first-class ways of handling caching, depending on how much control you want. For every task, you can specify whether results should be cached, how long they should be cached, and how the cache should be invalidated (age, different inputs to the task, flow parameter values, etc.).

The simplest way to cache a task is to use targets, which lets you specify that the task has a templatable side effect (usually a file in local or Cloud storage, but could be e.g. a database entry, redis key, or anything else). Before the task is run, it checks if the side effect exists and if it does, skips the run.

For example, this task will write its result to a local file automatically templated with the task name and the current date:

@task(result=LocalResult(), target="{task_name}-{today}")
def get_data():
    return [1, 2, 3, 4, 5]

As long as a matching file exists, the task won't re-run. Because {today} is part of the target name, that will implicitly cache the task's value for one day. You could also use a parameter in the template, like the backfill date, to replicate Airflow's behavior.

For more control, you can use Prefect's full cache mechanism by setting cache_for, cache_validator, and cache_key on any task. If set, the task will finish in a Cached state instead of a Success state. When paired with a proper orchestration backend like Prefect Server or Prefect Cloud, the Cached state can be queried by future runs of the same task (or any task with the same cache_key). That future task will return the Cached state as its own result.

like image 86
jlowin Avatar answered Oct 23 '22 13:10

jlowin