Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Dagrun for each datum instead of scheduled

The current problem that I am facing is that I have documents in a MongoDB collection which each need to be processed and updated by tasks which need to run in an acyclic dependency graph. If a task upstream fails to process a document, then none of the dependent tasks may process that document, as that document has not been updated with the prerequisite information.

If I were to use Airflow, this leaves me with two solutions:

  1. Trigger a DAG for each document, and pass in the document ID with --conf. The problem with this is that this is not the intended way for Airflow to be used; I would never be running a scheduled process, and based on how documents appear in the collection, I would be making 1440 Dagruns per day.

  2. Run a DAG every period for processing all documents created in the collection for that period. This follows how Airflow is expected to work, but the problem is that if a task fails to process a single document, none of the dependent tasks may process any of the other documents. Also, if a document takes longer than other documents do to be processed by a task, those other documents are waiting on that single document to continue down the DAG.

Is there a better method than Airflow? Or is there a better way to handle this in Airflow than the two methods I currently see?

like image 594
Sebastian Mendez Avatar asked Oct 16 '19 18:10

Sebastian Mendez


People also ask

How do I change the DAG schedule in Airflow?

To schedule a dag, Airflow just looks for the last execution date and sum the schedule interval . If this time has expired it will run the dag. You cannot simple update the start date. A simple way to do this is edit your start date and schedule interval , rename your dag (e.g. xxxx_v2.py) and redeploy it.

How do I stop my DAG from Airflowing?

Please notice that if the DAG is currently running, the Airflow scheduler will start again the tasks you delete. So either you stop the DAG first by changing its state or stop the scheduler (if you are running on a test environment).

Can you choose to execute more than one task with the BranchPythonOperator?

You multiple tasks but only one should be executed if a criterion is true? You've come to the right place! The BranchPythonOperator does exactly what you are looking for. It's common to have DAGs with different execution flows and you want to follow only one according to a value or a condition.

What is Start_date in Airflow DAG?

Similarly, since the start_date argument for the DAG and its tasks points to the same logical date, it marks the start of the DAG's first data interval, not when tasks in the DAG will start running. In other words, a DAG run will only be scheduled one interval after start_date .


1 Answers

From the knowledge I gained in my attempt to answer this question, I've come to the conclusion that Airflow is just not the tool for the job.

Airflow is designed for scheduled, idempotent DAGs. A DagRun must also have a unique execution_date; this means running the same DAG at the exact same start time (in the case that we receive two documents at the same time is quite literally impossible. Of course, we can schedule the next DagRun immediately in succession, but this limitation should demonstrate that any attempt to use Airflow in this fashion will always be, to an extent, a hack.

The most viable solution I've found is to instead use Prefect, which was developed with the intention of overcoming some of the limitations of Airflow:

"Prefect assumes that flows can be run at any time, for any reason."

Prefect's equivalent of a DAG is a Flow; one key advantage of a flow that we may take advantage of is the ease of parametriziation. Then, with some threads, we're able to have a Flow run for each element in a stream. Here is an example streaming ETL pipeline:

import time
from prefect import task, Flow, Parameter
from threading import Thread
​
​
def stream():
    for x in range(10):
        yield x
        time.sleep(1)
​
​
@task
def extract(x):
    # If 'x' referenced a document, in this step we could load that document
    return x
​
​
@task
def transform(x):
    return x * 2
​
​
@task
def load(y):
    print("Received y: {}".format(y))
​
​
with Flow("ETL") as flow:
    x_param = Parameter('x')
    e = extract(x_param)
    t = transform(e)
    l = load(t)
​
for x in stream():
    thread = Thread(target=flow.run, kwargs={"x": x})
    thread.start()
like image 57
Sebastian Mendez Avatar answered Sep 20 '22 02:09

Sebastian Mendez