Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to backfill historical data (once) for a new Flow in Prefect?

Tags:

etl

prefect

I just started reading about Prefect (and have a little experience using Airflow).

My goal is to set a task which runs daily in Prefect and collects data to a folder (I guess that's what Prefect can help me do for sure). Also my goal is to populate the historical data (as if I ran this job back in time).

In Airflow there is this concept of start_date which when set in the past, the DAG will run since that date and populate on each interval.

For example, if I have a task which takes a date and returns the data for that date, such as:

# Pseudo code
def get_values_from_somewhere(date: datetime) -> dict:
    return fetched_values_in_json(date)

Is there a native way to do this in Prefect? I could not find this answered anywhere in here or the docs, though backfilling is mentioned here. Any help / guidance will be super useful.

What I tried:

When I set schedule to be:

from datetime import datetime, timedelta

from prefect.schedules import Schedule

schedule = Schedule(clocks=[IntervalClock(interval=timedelta(hours=24), start_date=datetime(2019, 1, 1))])

and then I do flow.run() I simply get:

INFO:prefect.My-Task:Waiting for next scheduled run at 2020-09-24T00:00:00+00:00

What I was expecting is to run since the start_date which I have provided and then pause until it reaches present time and wait for the next schedule.

like image 667
Newskooler Avatar asked Sep 23 '20 13:09

Newskooler


1 Answers

Prefect does not make any implicit assumptions about how your Flow or its Tasks depend on time, and therefore performing a backfill depends on the structure of your Flow. There are generally two ways that time explicitly influences a Flow:

  • through a Parameter or DateTimeParameter
  • through prefect.context (which includes many time-related fields, described here)

Given that, performing a backfill can be achieved by creating any number of ad-hoc scheduled flow runs and overriding either the appropriate context key or the default parameter value. (Note that ad-hoc runs can be created for any flow, regardless of whether that flow has a schedule.)

To make this more precise, here are two examples that trigger a single backfill run (to accommodate more runs, loop over the appropriate values and create a run for each):

Using Context

import pendulum
import prefect


@prefect.task
def do_something_time_specific():
    """
    This task uses the timestamp provided to the custom `backfill_time`
    context key; if that does not exist, it falls back on the built-in
    `scheduled_start_time` context key.
    """

    current_time = prefect.context.get("backfill_time") or prefect.context.get("scheduled_start_time")
    if isinstance(current_time, str):
        current_time = pendulum.parse(current_time)
    # performs some action dealing with this timestamp


flow = Flow("backfill", tasks=[do_something_time_specific])

## using Core
flow.run() # will use current timestamp
flow.run(context={"backfill_time": "1986-01-02"}) # will use old timestamp

## using an API
prefect.Client().create_flow_run(flow_id="FLOWID", 
    context={"backfill_time": "1986-01-02"}) # will use old timestamp

Using a Parameter

import pendulum
import prefect


current_time = prefect.Parameter("current_time", default=None)

@prefect.task
def do_something_time_specific(current_time):
    """
    This task uses the timestamp provided to the task explicitly.
    """
    current_time = current_time or pendulum.now("utc") # uses "now" if not provided
    if isinstance(current_time, str):
        current_time = pendulum.parse(current_time)

    # performs some action dealing with this timestamp


with Flow("backfill") as flow:
    do_something_time_specific(current_time)


## using Core
flow.run() # will use current timestamp
flow.run(current_time="1986-01-02") # will use old timestamp

## using an API
prefect.Client().create_flow_run(flow_id="FLOWID", 
    parameters={"current_time": "1986-01-02"}) # will use old timestamp

Newer parameter classes such as DateTimeParameter provide nicer typing guarantees, but hopefully this demonstrates the idea.

EDIT: For completeness, note that ad-hoc runs can be created in Core for flows with schedules by running flow.run(run_on_schedule=False)

like image 194
chriswhite Avatar answered Nov 15 '22 14:11

chriswhite