Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I force a task on airflow to fail?

Tags:

python

airflow

I have a python callable process_csv_entries that processes csv file entries. I want my task to complete successfully only if all entries were processed successfully. Task should fail otherwise

def process_csv_entries(csv_file):     # Boolean      file_completely_parsed = <call_to_module_to_parse_csv>     return not file_completely_parsed  CSV_FILE=<Sets path to csv file> t1 = PythonOperator(dag=dag,                       task_id='parse_csv_completely',                       python_operator=process_csv_entries,                       op_args=[CSV_FILE]) 

t1 seems to complete successfully irrespective of returned value. How do I force PythonOperator task to fail?

like image 243
Mask Avatar asked Mar 30 '17 07:03

Mask


People also ask

How do you stop a task in Airflow?

There you can select the presented tasks and set them to another state or delete them. Please notice that if the DAG is currently running, the Airflow scheduler will start again the tasks you delete. So either you stop the DAG first by changing its state or stop the scheduler (if you are running on a test environment).

How do you retry a failed task in Airflow?

If you want to re-run a task in Airflow, the best way to do so is to press Clear or Delete (language depends on the Airflow version you're running), not Run . Hitting this will clear the state of your failed task and allow the scheduler to pick it back up and re-run it.

What is trigger rule in Airflow?

Basically, a trigger rule defines why a task gets triggered, on which condition. By default, all tasks have the same trigger rule all_success set which means, if all parents of a task succeed, then the task gets triggered.

How do you increase Airflow exception?

Raise when creating a DAG run for DAG which already has DAG run entry. Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder. Raise when a Task with duplicate task_id is defined in the same DAG. Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup.


2 Answers

raise exception when you meet the error condition ( in your case: when file is not sucesfully parsed)

raise ValueError('File not parsed completely/correctly') 

raise relevant error type with suitable message

like image 81
Priyank Mehta Avatar answered Sep 23 '22 08:09

Priyank Mehta


Yes, raise AirflowException, this will cause the task to move immediately to failure state.

from airflow import AirflowException

ValueError can be used for fail and retry.

like image 36
Alon Rolnik Avatar answered Sep 21 '22 08:09

Alon Rolnik