I'am having an issue with airflow when running it on a 24xlarge machine on EC2.
I must note that the parallelism level is 256.
For some days the dagrun finishes with status 'failed' for two undetermined reasons :
Some task has the status 'upstream_failed', which is not true because we can see clearly that all the previous steps where successful.
Other tasks have not the status 'null', they have not started yet and they cause the dagrun to fail.
I must note that the logs for both of these tasks are empty
And here is the tast instance details for these cases :
Any solutions please ?
max_active_runs : This is the maximum number of active DAG runs allowed for the DAG in question. Once this limit is hit, the Scheduler will not create new active DAG runs.
You can run a task independently by using -i/-I/-A flags along with the run command. But yes the design of airflow does not permit running a specific task and all its dependencies.
Airflow's dynamic task mapping feature is built off of the MapReduce programming model. The map procedure takes a set of inputs and creates a single task for each one. The reduce procedure, which is optional, allows a task to operate on the collected output of a mapped task.
To test this, you can run airflow dags list and confirm that your DAG shows up in the list. You can also run airflow tasks list foo_dag_id --tree and confirm that your task shows up in the list as expected.
The other case where I've experienced the second condition ("Other tasks have not the status 'null'"), is when the task instance has changed, and specifically changed operator type.
I'm hoping you already got an answer / were able to move on. I've been stuck on this issue a few times in the last month, so I figured I would document what I ended up doing to solve the issue.
Example:
As best I can piece together, what's happening is:
task_instance
operator
task_instance
with the correct operator type; not seeing it, it updates the associated database record(s) as state = 'removed'You can see tasks impacted by this process with the query:
SELECT *
FROM task_instance
WHERE state = 'removed'
It looks like there's been work on this issue for airflow 1.10:
That being said, I'm not 100% sure based on the commits that I can find that it would resolve this issue. It seems like the general philosophy is still "when a DAG changes, you should increment / change the DAG name".
I don't love that solution, because it makes it hard to iterate on what is fundamentally one pipeline. The alternative I used was to follow (partially) the recommendations from Astronomer and "blow out" the DAG history. In order to do that, you need to:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With