Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to wait for an asynchronous event in a task of a DAG in a workflow implemented using Airflow?

My workflow implemented using Airflow contains tasks A, B, C, and D. I want the workflow to wait at task C for an event. In Airflow sensors are used to check for some condition by polling for some state, if that condition is true then the next task in the workflow gets triggered. My requirement is to avoid polling. Here one answer mentions about a rest_api_plugin of airflow which creates rest_api endpoint to trigger airflow CLI - using this plugin I can trigger a task in the workflow. In my workflow, however, I want to implement a task that waits for a rest API call(async event) without polling, once it receives the rest API request the task gets triggered and the Airflow workflow resumes.

Reasons to avoid polling: it is inefficient and does not scale as per our requirements.

Update

I followed the suggestion mentioned in the answer by @Daniel Huang, I created a sensor which returns False. This sensor is implemented in task:start_evaluating_cycle, now this sensor task is not sensing anything but always returning False:

class WaitForEventSensor(BaseSensorOperator):

    def poke(self, context):
        return False

start_evaluating_cycle = WaitForEventSensor(
    task_id="start_evaluating_cycle",
    dag=dag,
    poke_interval=60*60 # any number will do here, because it not polling just returning false
)

enter image description here

I configured rest_api_plugin and using the plugin I am trying to mark the task:start_evaluating_cyle as complete to continue the workflow.

  • enter image description here

The rest_api_plugin executes the task successfully and I can see that the task was run using flower:

  • enter image description here

But in the workflow the task: start_evaluating_cycle is still in the running state:

  • List item

rest_api_plugin is running the task independent of the workflow. How can I make rest_api_plugin to run the task inside the workflow - not independent of workflow.

However when I select task from airflow UI admin and mark success:

  • List item

It takes me to this url:http://localhost:8080/admin/airflow/success?task_id=start_evaluating_cycle&dag_id=faculty_evaluation_workflow&upstream=false&downstream=false&future=false&past=false&execution_date=2017-11-26T06:48:54.297289&origin=http%3A%2F%2Flocalhost%3A8080%2Fadmin%2Fairflow%2Fgraph%3Fexecution_date%3D2017-11-26T06%253A48%253A54.297289%26arrange%3DTB%26root%3D%26dag_id%3Dfaculty_evaluation_workflow%26_csrf_token%3DImM3NmU4ZTVjYTljZTQzYWJhNGI4Mzg2MmZmNDU5OGYxYWY0ODAxYWMi.DPv1Ww.EnWS6ffVLNcs923y6eVRV_8R-X8

and when I confirm, the workflow proceeds further - which is what I want, but I need to mark the success from a rest API call.

My concerns are:

  1. How to mark a task running inside a workflow as successful using
    rest_api_plugin?
  2. Is it possible to use the URL that airflow admin creates to mark a task as successful by calling it from an external system?
like image 472
javed Avatar asked Nov 23 '17 08:11

javed


1 Answers

One possible solution is using a sensor that waits forever until something external manually sets its state to success.

So you'd have some sort of dummy sensor:

class DummySensor(BaseSensorOperator):

    def poke(self, context):
        return False

Initialized like this:

task_c = DummySensor(
    dag=dag,
    task_id='task_c',
    interval=600,  # something fairly high since we're not polling for anything, just to check when to timeout
    timeout=3600,  # if nothing externally sets state to success in 1 hour, task will fail so task D does not run
)

When Task C starts, it will just sit in RUNNING state. Then you can use the REST API Plugin to set Task C's state to SUCCESS when the condition is met. Task D and other downstream tasks will then get kicked off.

The downside to this is the dummy sensor still holds onto a worker slot while it waits doing nothing.

like image 190
Daniel Huang Avatar answered Sep 20 '22 13:09

Daniel Huang