My workflow implemented using Airflow contains tasks A, B, C, and D. I want the workflow to wait at task C for an event. In Airflow sensors are used to check for some condition by polling for some state, if that condition is true then the next task in the workflow gets triggered. My requirement is to avoid polling. Here one answer mentions about a rest_api_plugin of airflow which creates rest_api endpoint to trigger airflow CLI - using this plugin I can trigger a task in the workflow. In my workflow, however, I want to implement a task that waits for a rest API call(async event) without polling, once it receives the rest API request the task gets triggered and the Airflow workflow resumes.
Reasons to avoid polling: it is inefficient and does not scale as per our requirements.
Update
I followed the suggestion mentioned in the answer by @Daniel Huang, I created a sensor which returns False. This sensor is implemented in task:start_evaluating_cycle, now this sensor task is not sensing anything but always returning False:
class WaitForEventSensor(BaseSensorOperator):
def poke(self, context):
return False
start_evaluating_cycle = WaitForEventSensor(
task_id="start_evaluating_cycle",
dag=dag,
poke_interval=60*60 # any number will do here, because it not polling just returning false
)
I configured rest_api_plugin and using the plugin I am trying to mark the task:start_evaluating_cyle as complete to continue the workflow.
The rest_api_plugin executes the task successfully and I can see that the task was run using flower:
But in the workflow the task: start_evaluating_cycle is still in the running state:
rest_api_plugin is running the task independent of the workflow. How can I make rest_api_plugin to run the task inside the workflow - not independent of workflow.
However when I select task from airflow UI admin and mark success:
It takes me to this url:http://localhost:8080/admin/airflow/success?task_id=start_evaluating_cycle&dag_id=faculty_evaluation_workflow&upstream=false&downstream=false&future=false&past=false&execution_date=2017-11-26T06:48:54.297289&origin=http%3A%2F%2Flocalhost%3A8080%2Fadmin%2Fairflow%2Fgraph%3Fexecution_date%3D2017-11-26T06%253A48%253A54.297289%26arrange%3DTB%26root%3D%26dag_id%3Dfaculty_evaluation_workflow%26_csrf_token%3DImM3NmU4ZTVjYTljZTQzYWJhNGI4Mzg2MmZmNDU5OGYxYWY0ODAxYWMi.DPv1Ww.EnWS6ffVLNcs923y6eVRV_8R-X8
and when I confirm, the workflow proceeds further - which is what I want, but I need to mark the success from a rest API call.
My concerns are:
One possible solution is using a sensor that waits forever until something external manually sets its state to success.
So you'd have some sort of dummy sensor:
class DummySensor(BaseSensorOperator):
def poke(self, context):
return False
Initialized like this:
task_c = DummySensor(
dag=dag,
task_id='task_c',
interval=600, # something fairly high since we're not polling for anything, just to check when to timeout
timeout=3600, # if nothing externally sets state to success in 1 hour, task will fail so task D does not run
)
When Task C starts, it will just sit in RUNNING state. Then you can use the REST API Plugin to set Task C's state to SUCCESS when the condition is met. Task D and other downstream tasks will then get kicked off.
The downside to this is the dummy sensor still holds onto a worker slot while it waits doing nothing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With