Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does the mode "reschedule" in Airflow Sensors work?

I have an Airflow Http sensor that calls a REST endpoint and checks for a specific value in the JSON structure returned by the API

sensor = HttpSensor(
    soft_fail=True,
    task_id='http_sensor_check',
    http_conn_id='http_default',
    endpoint='http://localhost:8082/api/v1/resources/games/all',
    request_params={},
    response_check=lambda response: True if check_api_response(response) is True else False,
    mode='reschedule',
    dag=dag)

If the response_check is false, the DAG is put in a "up_for_reschedule" state. The issue is, the DAG stayed in that status forever and never got rescheduled.

My questions are:

  • What does "up_for_reschedule" means? and when would be the DAG rescheduled?
  • Let's suppose my DAG is scheduled to run every 5 minutes but because of the sensor, the "up_for_reschedule" DAG instance overlaps with the new run, will I have 2 DAGS running at the same time?

Thank you in advance.

like image 352
OCDev Avatar asked Mar 19 '21 21:03

OCDev


People also ask

What is up for reschedule in airflow?

In sensor mode='reschedule' means that if the criteria of the sensor isn't True then the sensor will release the worker to other tasks. This is very useful for cases when sensor may wait for a long time.

What is sensor operator in airflow?

Sensors are a special type of Operator that are designed to do exactly one thing - wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then succeed so their downstream tasks can run.

What is soft fail in airflow?

Airflow Documentation. Soft fail – Defines what happens if the sensor fails. If set to False, it allows the sensor to retry and if set to True it allows DAG to mark the task as skipped on failure. If you want to keep the sensor to retry just make sure it is set to False or default ;).

What is Max active runs in airflow?

There are three primary DAG-level Airflow settings that users can define in code: max_active_runs : This is the maximum number of active DAG runs allowed for the DAG in question. Once this limit is hit, the Scheduler will not create new active DAG runs.

What does the sensor hold onto when reschedule the sensor task?

Note that the sensor will hold onto a worker slot and a pool slot for the duration of the sensor's runtime in this mode. When set to reschedule the sensor task frees the worker slot when the criteria is not yet met and it's rescheduled at a later time.

What does the “reschedule” mode do?

If you set your sensor to the “reschedule” mode, it will free the slot when the condition/criteria is not met and it is rescheduled later. Concretely, between each poke_interval, when the sensor is not checking your criteria anymore, the slot is released and your sensor gets the status “up_for_reschedule”.

How does the Apache Airflow workflow work?

The workflow is built with Apache Airflow’s DAG (Directed Acyclic Graph), which has nodes and connectors. A Dependency Tree is created by connecting nodes with connectors. Dynamic Integration: Airflow generates dynamic pipelines using Python as the backend programming language.

How do airflow tasks progress?

Airflow Tasks should ideally progress from none to Scheduled, Queued, Running, and finally Success. Any Custom Task (Operator) will receive a copy of the Task Instance supplied to it when it runs, it has methods for things like XComs as well as the ability to inspect task metadata. How do Timeouts work with Airflow Tasks?


Video Answer


1 Answers

In sensor mode='reschedule' means that if the criteria of the sensor isn't True then the sensor will release the worker to other tasks. This is very useful for cases when sensor may wait for a long time.

  1. up_for_reschedule means that the sensor condition isn't true yet and it hasnt reached timout so the task is waiting to be rescheduled by the scheduler.
  2. You don't know when the task will run. That depends on the scheduler (available resources, priorities etc..). If you don't want to allow parallel dag runs use max_active_runs=1 in DAG constructor.
like image 121
Elad Kalif Avatar answered Oct 17 '22 13:10

Elad Kalif