I just started reading about Prefect (and have a little experience using Airflow).
My goal is to set a task which runs daily in Prefect and collects data to a folder (I guess that's what Prefect can help me do for sure). Also my goal is to populate the historical data (as if I ran this job back in time).
In Airflow there is this concept of start_date which when set in the past, the DAG will run since that date and populate on each interval.
For example, if I have a task which takes a date and returns the data for that date, such as:
# Pseudo code
def get_values_from_somewhere(date: datetime) -> dict:
return fetched_values_in_json(date)
Is there a native way to do this in Prefect? I could not find this answered anywhere in here or the docs, though backfilling is mentioned here. Any help / guidance will be super useful.
What I tried:
When I set schedule
to be:
from datetime import datetime, timedelta
from prefect.schedules import Schedule
schedule = Schedule(clocks=[IntervalClock(interval=timedelta(hours=24), start_date=datetime(2019, 1, 1))])
and then I do flow.run()
I simply get:
INFO:prefect.My-Task:Waiting for next scheduled run at 2020-09-24T00:00:00+00:00
What I was expecting is to run since the start_date
which I have provided and then pause until it reaches present time and wait for the next schedule.
Prefect does not make any implicit assumptions about how your Flow or its Tasks depend on time, and therefore performing a backfill depends on the structure of your Flow. There are generally two ways that time explicitly influences a Flow:
Parameter
or DateTimeParameter
prefect.context
(which includes many time-related fields, described here)Given that, performing a backfill can be achieved by creating any number of ad-hoc scheduled flow runs and overriding either the appropriate context key or the default parameter value. (Note that ad-hoc runs can be created for any flow, regardless of whether that flow has a schedule.)
To make this more precise, here are two examples that trigger a single backfill run (to accommodate more runs, loop over the appropriate values and create a run for each):
import pendulum
import prefect
@prefect.task
def do_something_time_specific():
"""
This task uses the timestamp provided to the custom `backfill_time`
context key; if that does not exist, it falls back on the built-in
`scheduled_start_time` context key.
"""
current_time = prefect.context.get("backfill_time") or prefect.context.get("scheduled_start_time")
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# performs some action dealing with this timestamp
flow = Flow("backfill", tasks=[do_something_time_specific])
## using Core
flow.run() # will use current timestamp
flow.run(context={"backfill_time": "1986-01-02"}) # will use old timestamp
## using an API
prefect.Client().create_flow_run(flow_id="FLOWID",
context={"backfill_time": "1986-01-02"}) # will use old timestamp
import pendulum
import prefect
current_time = prefect.Parameter("current_time", default=None)
@prefect.task
def do_something_time_specific(current_time):
"""
This task uses the timestamp provided to the task explicitly.
"""
current_time = current_time or pendulum.now("utc") # uses "now" if not provided
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# performs some action dealing with this timestamp
with Flow("backfill") as flow:
do_something_time_specific(current_time)
## using Core
flow.run() # will use current timestamp
flow.run(current_time="1986-01-02") # will use old timestamp
## using an API
prefect.Client().create_flow_run(flow_id="FLOWID",
parameters={"current_time": "1986-01-02"}) # will use old timestamp
Newer parameter classes such as DateTimeParameter
provide nicer typing guarantees, but hopefully this demonstrates the idea.
EDIT: For completeness, note that ad-hoc runs can be created in Core for flows with schedules by running flow.run(run_on_schedule=False)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With