I have the following task to solve:
Files are being sent at irregular times through an endpoint and stored locally. I need to trigger a DAG run for each of these files. For each file the same tasks will be performed
Overall the flows looks as follows: For each file, run tasks A->B->C->D
Files are being processed in batch. While this task seemed trivial to me, I have found several ways to do this and I am confused about which one is the "proper" one (if any).
That is, expose a web service which ingests the request and the file, stores it to a folder, and uses the experimental REST api to trigger the DAG, by passing the file_id as conf
Cons: REST apis are still experimental, not sure how Airflow can handle a load test with many requests coming at one point (which shouldn't happen, but, what if it does?)
Always using the same ws as described before, but this time it justs stores the file. Then we have:
Cons: Need to avoid that the same files are being sent to two different DAG runs. Example:
Files in folder x.json Sensor finds x, triggers DAG (1)
Sensor goes back and scheduled again. If DAG (1) did not process/move the file, the sensor DAG might reschedule a new DAG run with the same file. Which is unwanted.
As seen in this question.
Cons: This could work, however what I dislike is that the UI will probably get messed up because every DAG run will not look the same but it will change with the number of files being processed. Also if there are 1000 files to be processed the run would probably be very difficult to read
I am not yet sure how they completely work as I have seen they are not encouraged (at the end), however it should be possible to spawn a subdag for each file and have it running. Similar to this question.
Cons: Seems like subdags can only be used with the sequential executor.
Am I missing something and over-thinking something that should be (in my mind) quite straight-forward? Thanks
concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.
Airflow scans the dags_folder for new DAGs every dag_dir_list_interval , which defaults to 5 minutes but can be modified. You might have to wait until this interval has passed before a new DAG appears in the UI.
The precedence rules for a task are as follows: Explicitly passed arguments. Values that exist in the default_args dictionary. The operator's default value, if one exists.
I know I am late, but I would choose the second pattern: "2 dags. One senses and triggers with TriggerDagOperator, one processes", because:
Renaming and/or moving files is a pretty standard way to process files in every ETL.
By the way, I always recommend this article https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753. It doesn't
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With