Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Ooops... AttributeError when clearing failed task state in airflow

I am trying to clear a failed task so that it will run again.

I usually do this with the web GUI from the tree view

tree view showing failed task & clear popup

After selecting "Clear" I am directed to an error page:

error page

The traceback on this page is the same error I receive when trying to clear this task using the CLI:

[u@airflow01 ~]# airflow clear -s 2002-07-29T20:25:00 -t 
coverage_check  gom_modis_aqua_coverage_check 
[2018-01-16 16:21:04,235] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-01-16 16:21:05,192] {models.py:167} INFO - Filling up the DagBag from /root/airflow/dags
Traceback (most recent call last):
  File "/usr/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/lib/python3.4/site-packages/airflow/bin/cli.py", line 612, in clear
    include_upstream=args.upstream,
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3173, in sub_dag
    dag = copy.deepcopy(self)
  File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
    y = copier(memo)
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3159, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
    y = copier(memo)
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 2202, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib64/python3.4/copy.py", line 182, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib64/python3.4/copy.py", line 309, in _reconstruct
    y.__dict__.update(state)
AttributeError: 'NoneType' object has no attribute 'update'

Looking for ideas on what may have caused this, what I should do to fix this task, and how to avoid this in the future.

I was able to work around the issue by deleting the task record using the "Browse > Task Instances" search, but would still like to explore the issue as I have seen this multiple times.

Although my DAG code is getting complicated, here is an excerpt from where the operator is defined within the dag:

    trigger_granule_dag_id = 'trigger_' + process_pass_dag_name
    coverage_check = BranchPythonOperator(
        task_id='coverage_check',
        python_callable=_coverage_check,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(hours=3),
        queue=QUEUE.PYCMR,
        op_kwargs={
            'roi':region,
            'success_branch_id': trigger_granule_dag_id
        }
    )

The full source code can be browsed at github/USF-IMARS/imars_dags. Here are links to the most relevant parts:

  • Operator instantiated in /gom/gom_modis_aqua_coverage_check.py using modis_aqua_coverage_check factory
  • factory function defines coverage_check BranchPythonOperator in /builders/modis_aqua_coverage_check.py
  • python_callable is _coverage_check function in same file
like image 779
7yl4r Avatar asked Jan 16 '18 16:01

7yl4r


People also ask

How do you retry a failed task in Airflow?

If you want to re-run a task in Airflow, the best way to do so is to press Clear or Delete (language depends on the Airflow version you're running), not Run . Hitting this will clear the state of your failed task and allow the scheduler to pick it back up and re-run it.

How do you clear an Airflow task?

Click on the "select all" checkbox at the top of the list to select all of the queued tasks. Now, in the "Actions" menu, select "Clear" and apply it to all of the queued tasks. Confirm your choice to Clear the queued tasks. Airflow should immediately prepare to run the queued tasks.

How do I mark Airflow as failed?

If you want to control your task's state from within custom Task/Operator code, Airflow provides two special exceptions you can raise: AirflowSkipException will mark the current task as skipped. AirflowFailException will mark the current task as failed ignoring any remaining retry attempts.

Is Start_date mandatory in Airflow DAG?

This is no longer required. Airflow will now auto align the start_date and the schedule , by using the start_date as the moment to start looking.


2 Answers

Below is a sample DAG that I created to mimic the error that you are facing.

import logging
import os
from datetime import datetime, timedelta

import boto3
from airflow import DAG
from airflow import configuration as conf
from airflow.operators import ShortCircuitOperator, PythonOperator, DummyOperator


def athena_data_validation(**kwargs):
    pass


start_date = datetime.now()

args = {
    'owner': 'airflow',
    'start_date': start_date,
    'depends_on_past': False,
    'wait_for_downstream': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=30)
}

dag_name = 'data_validation_dag'

schedule_interval = None  

dag = DAG(
    dag_id=dag_name,
    default_args=args,
    schedule_interval=schedule_interval)

athena_client = boto3.client('athena', region_name='us-west-2')

DAG_SCRIPTS_DIR = conf.get('core', 'DAGS_FOLDER') + "/data_validation/"

start_task = DummyOperator(task_id='Start_Task', dag=dag)

end_task = DummyOperator(task_id='End_Task', dag=dag)

data_validation_task = ShortCircuitOperator(
    task_id='Data_Validation',
    provide_context=True,
    python_callable=athena_data_validation,
    op_kwargs={
        'athena_client': athena_client,
        'sql_file': DAG_SCRIPTS_DIR + 'data_validation.sql',
        's3_output_path': 's3://XXX/YYY/'
    },
    dag=dag)
data_validation_task.set_upstream(start_task)
data_validation_task.set_downstream(end_task)

After one successful run, I tried to clear the Data_Validation task and got the same error (see below). enter image description here enter image description here

I removed the athena_client object creation and placed it inside the athena_data_validation function and then it worked. So when we do a clear in Airflow UI, it tries to do a deepcopy and get all the objects from previous run. I am still trying to understand why its not able to get a copy of the object type but I got a workaround which was working for me.

like image 124
Sai Neelakantam Avatar answered Nov 03 '22 02:11

Sai Neelakantam


During some operations, Airflow deep copies some objects. Unfortunately, some objects do not allow this. The boto client is a good example of something that does not deep copies nicely, thread objects are another, but large objects with nested references like a reference to a parent task below can also cause issues.

In general, you do not want to instantiate a client in the dag code itself. That said, I do not think that it is your issue here. Though I do not have access to the pyCMR code to see if it could be an issue.

like image 20
artwr Avatar answered Nov 03 '22 02:11

artwr