I have an Airflow Http sensor that calls a REST endpoint and checks for a specific value in the JSON structure returned by the API
sensor = HttpSensor(
soft_fail=True,
task_id='http_sensor_check',
http_conn_id='http_default',
endpoint='http://localhost:8082/api/v1/resources/games/all',
request_params={},
response_check=lambda response: True if check_api_response(response) is True else False,
mode='reschedule',
dag=dag)
If the response_check is false, the DAG is put in a "up_for_reschedule" state. The issue is, the DAG stayed in that status forever and never got rescheduled.
My questions are:
Thank you in advance.
In sensor mode='reschedule' means that if the criteria of the sensor isn't True then the sensor will release the worker to other tasks. This is very useful for cases when sensor may wait for a long time.
Sensors are a special type of Operator that are designed to do exactly one thing - wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then succeed so their downstream tasks can run.
Airflow Documentation. Soft fail – Defines what happens if the sensor fails. If set to False, it allows the sensor to retry and if set to True it allows DAG to mark the task as skipped on failure. If you want to keep the sensor to retry just make sure it is set to False or default ;).
There are three primary DAG-level Airflow settings that users can define in code: max_active_runs : This is the maximum number of active DAG runs allowed for the DAG in question. Once this limit is hit, the Scheduler will not create new active DAG runs.
Note that the sensor will hold onto a worker slot and a pool slot for the duration of the sensor's runtime in this mode. When set to reschedule the sensor task frees the worker slot when the criteria is not yet met and it's rescheduled at a later time.
If you set your sensor to the “reschedule” mode, it will free the slot when the condition/criteria is not met and it is rescheduled later. Concretely, between each poke_interval, when the sensor is not checking your criteria anymore, the slot is released and your sensor gets the status “up_for_reschedule”.
The workflow is built with Apache Airflow’s DAG (Directed Acyclic Graph), which has nodes and connectors. A Dependency Tree is created by connecting nodes with connectors. Dynamic Integration: Airflow generates dynamic pipelines using Python as the backend programming language.
Airflow Tasks should ideally progress from none to Scheduled, Queued, Running, and finally Success. Any Custom Task (Operator) will receive a copy of the Task Instance supplied to it when it runs, it has methods for things like XComs as well as the ability to inspect task metadata. How do Timeouts work with Airflow Tasks?
In sensor mode='reschedule'
means that if the criteria of the sensor isn't True then the sensor will release the worker to other tasks. This is very useful for cases when sensor may wait for a long time.
up_for_reschedule
means that the sensor condition isn't true yet and it hasnt reached timout so the task is waiting to be rescheduled by the scheduler.max_active_runs=1
in DAG constructor.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With