I am trying to clear a failed task so that it will run again.
I usually do this with the web GUI from the tree view
After selecting "Clear" I am directed to an error page:
The traceback on this page is the same error I receive when trying to clear this task using the CLI:
[u@airflow01 ~]# airflow clear -s 2002-07-29T20:25:00 -t
coverage_check gom_modis_aqua_coverage_check
[2018-01-16 16:21:04,235] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-01-16 16:21:05,192] {models.py:167} INFO - Filling up the DagBag from /root/airflow/dags
Traceback (most recent call last):
File "/usr/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/lib/python3.4/site-packages/airflow/bin/cli.py", line 612, in clear
include_upstream=args.upstream,
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3173, in sub_dag
dag = copy.deepcopy(self)
File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
y = copier(memo)
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3159, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
y = copier(x, memo)
File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
y = copier(memo)
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 2202, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
y = copier(x, memo)
File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib64/python3.4/copy.py", line 182, in deepcopy
y = _reconstruct(x, rv, 1, memo)
File "/usr/lib64/python3.4/copy.py", line 309, in _reconstruct
y.__dict__.update(state)
AttributeError: 'NoneType' object has no attribute 'update'
Looking for ideas on what may have caused this, what I should do to fix this task, and how to avoid this in the future.
I was able to work around the issue by deleting the task record using the "Browse > Task Instances" search, but would still like to explore the issue as I have seen this multiple times.
Although my DAG code is getting complicated, here is an excerpt from where the operator is defined within the dag:
trigger_granule_dag_id = 'trigger_' + process_pass_dag_name
coverage_check = BranchPythonOperator(
task_id='coverage_check',
python_callable=_coverage_check,
provide_context=True,
retries=10,
retry_delay=timedelta(hours=3),
queue=QUEUE.PYCMR,
op_kwargs={
'roi':region,
'success_branch_id': trigger_granule_dag_id
}
)
The full source code can be browsed at github/USF-IMARS/imars_dags. Here are links to the most relevant parts:
If you want to re-run a task in Airflow, the best way to do so is to press Clear or Delete (language depends on the Airflow version you're running), not Run . Hitting this will clear the state of your failed task and allow the scheduler to pick it back up and re-run it.
Click on the "select all" checkbox at the top of the list to select all of the queued tasks. Now, in the "Actions" menu, select "Clear" and apply it to all of the queued tasks. Confirm your choice to Clear the queued tasks. Airflow should immediately prepare to run the queued tasks.
If you want to control your task's state from within custom Task/Operator code, Airflow provides two special exceptions you can raise: AirflowSkipException will mark the current task as skipped. AirflowFailException will mark the current task as failed ignoring any remaining retry attempts.
This is no longer required. Airflow will now auto align the start_date and the schedule , by using the start_date as the moment to start looking.
Below is a sample DAG that I created to mimic the error that you are facing.
import logging
import os
from datetime import datetime, timedelta
import boto3
from airflow import DAG
from airflow import configuration as conf
from airflow.operators import ShortCircuitOperator, PythonOperator, DummyOperator
def athena_data_validation(**kwargs):
pass
start_date = datetime.now()
args = {
'owner': 'airflow',
'start_date': start_date,
'depends_on_past': False,
'wait_for_downstream': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=30)
}
dag_name = 'data_validation_dag'
schedule_interval = None
dag = DAG(
dag_id=dag_name,
default_args=args,
schedule_interval=schedule_interval)
athena_client = boto3.client('athena', region_name='us-west-2')
DAG_SCRIPTS_DIR = conf.get('core', 'DAGS_FOLDER') + "/data_validation/"
start_task = DummyOperator(task_id='Start_Task', dag=dag)
end_task = DummyOperator(task_id='End_Task', dag=dag)
data_validation_task = ShortCircuitOperator(
task_id='Data_Validation',
provide_context=True,
python_callable=athena_data_validation,
op_kwargs={
'athena_client': athena_client,
'sql_file': DAG_SCRIPTS_DIR + 'data_validation.sql',
's3_output_path': 's3://XXX/YYY/'
},
dag=dag)
data_validation_task.set_upstream(start_task)
data_validation_task.set_downstream(end_task)
After one successful run, I tried to clear the Data_Validation
task and got the same error (see below).
I removed the athena_client
object creation and placed it inside the athena_data_validation
function and then it worked. So when we do a clear
in Airflow UI, it tries to do a deepcopy
and get all the objects from previous run. I am still trying to understand why its not able to get a copy of the object
type but I got a workaround which was working for me.
During some operations, Airflow deep copies some objects. Unfortunately, some objects do not allow this. The boto client is a good example of something that does not deep copies nicely, thread objects are another, but large objects with nested references like a reference to a parent task below can also cause issues.
In general, you do not want to instantiate a client in the dag code itself. That said, I do not think that it is your issue here. Though I do not have access to the pyCMR code to see if it could be an issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With