Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow - Locking between tasks so that only one parallel task runs at a time?

I have one DAG that has three task streams (licappts, agents, agentpolicy):

enter image description here

For simplicity I'm calling these three distinct streams. The streams are independent in the sense that just because agentpolicy failed doesn't mean the other two (liceappts and agents) should be affected by the other streams failure.

But for the sourceType_emr_task_1 tasks (i.e., licappts_emr_task_1, agents_emr_task_1, and agentpolicy_emr_task_1) I can only run one of these tasks at a time. For example I can't run agents_emr_task_1 and agentpolicy_emr_task_1 at the same time even though they are two independent tasks that don't necessarily care about each other.

How can I achieve this functionality in Airflow? For now the only thing I can think of is to wrap that task in a script that somehow locks a global variable, then if the variable is locked I'll have the script do a Thread.sleep(60 seconds) or something, and then retry. But that seems very hacky and I'm curious if Airflow offers a solution for this.

I'm open to restructuring the ordering of my DAG if needed to achieve this. One thing I thought about doing was to make a hard coded ordering of

Dag Starts -> ... -> licappts_emr_task_1 -> agents_emr_task_1 -> agentpolicy_emr_task_1 -> DAG Finished

But I don't think combining the streams this way because then for example agentpolicy_emr_task_1 has to wait for the other two to finish before it can start and there could be times when agentpolicy_emr_task_1 is ready to go before the other two have finished their other tasks.

So ideally I want whatever sourceType_emr_task_1 task to start that's ready first and then block the other tasks from running their sourceType_emr_task_1 task until it's finished.

Update:

Another solution I just thought of is if there is a way for me to check on the status of another task I could create a script for sourceType_emr_task_1 that checks to see if any of the other two sourceType_emr_task_1 tasks have a status of running, and if they do it'll sleep and periodically check to see if none of the other's are running, in which case it'll start it's process. I'm not a big fan of this way though because I feel like it could cause a race condition where both read (at the same time) that none are running and both start running.

like image 728
Kyle Bridenstine Avatar asked Jul 30 '18 18:07

Kyle Bridenstine


1 Answers

You could use a pool to ensure the parallelism for those tasks is 1.

For each of the *_emr_task_1 tasks, set a pool kwarg to to be something like pool=emr_task.

Then just go into the webserver -> admin -> pools -> create: Set the name Pool to match the pool used in your operator, and the Slots to be 1.

This will ensure the scheduler will only allow tasks to be queued for that pool up to the number of slots configured, regardless of the parallelism of the rest of Airflow.

like image 198
cwurtz Avatar answered Sep 22 '22 20:09

cwurtz