In Prefect, suppose I have some pipeline which runs f(date) for every date in a list, and saves it to a file. This is a pretty common ETL operation. In airflow, if I run this once, it will backfill for all historical dates. If I run it again, it will know that the task has been run, and only run any new tasks that have appeared (ie latest date).
In Prefect, to my knowledge, it will run the entire pipeline every day, even if 99% of the tasks were completed the day before. What are some solutions to dealing with this, without switching to Prefect Cloud? Do you just do something like make every task cache it's completion in redis before exiting?
In simple terms, a Prefect Task represents an individual unit of work. For example, all of the following could qualify as a Prefect Task: querying a database. formatting a string. spinning up a Spark cluster.
Prefect treats flows as functions, which means they can be run at any time, with any concurrency, for any reason. However, flows may also have schedules. In Prefect terms, a schedule is nothing more than a way to indicate that you want to start a new run at a specific time.
Prefect has many first-class ways of handling caching, depending on how much control you want. For every task, you can specify whether results should be cached, how long they should be cached, and how the cache should be invalidated (age, different inputs to the task, flow parameter values, etc.).
The simplest way to cache a task is to use targets, which lets you specify that the task has a templatable side effect (usually a file in local or Cloud storage, but could be e.g. a database entry, redis key, or anything else). Before the task is run, it checks if the side effect exists and if it does, skips the run.
For example, this task will write its result to a local file automatically templated with the task name and the current date:
@task(result=LocalResult(), target="{task_name}-{today}")
def get_data():
return [1, 2, 3, 4, 5]
As long as a matching file exists, the task won't re-run. Because {today}
is part of the target name, that will implicitly cache the task's value for one day. You could also use a parameter in the template, like the backfill date, to replicate Airflow's behavior.
For more control, you can use Prefect's full cache mechanism by setting cache_for
, cache_validator
, and cache_key
on any task. If set, the task will finish in a Cached
state instead of a Success
state. When paired with a proper orchestration backend like Prefect Server or Prefect Cloud, the Cached
state can be queried by future runs of the same task (or any task with the same cache_key
). That future task will return the Cached
state as its own result.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With