Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

apache airflow scheduler not scheduling jobs

Tags:

I'm working with apache airflow 1.8.0.

Here is output when I backfill the job.

[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00     [scheduled]>
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>
[2017-04-13 09:42:55,864] {models.py:1120} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 3 non-success(es). upstream_tasks_state={'skipped': Decimal('0'), 'successes': Decimal('0'), 'done': 0, 'upstream_failed': Decimal('0'), 'failed': Decimal('0')}, upstream_task_ids=['runme_0', 'runme_1', 'runme_2']

when I try to schedule any DAG it throws error.

Traceback (most recent call last):
  File "/anaconda3/bin/airflow", line 28, in <module>
    args.func(args)
  File "/anaconda3/lib/python3.5/site-packages/airflow/bin/cli.py", line 167, in backfill
    pool=args.pool)
  File "/anaconda3/lib/python3.5/site-packages/airflow/models.py", line 3330, in run
    job.run()
  File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 200, in run
    self._execute()
  File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 2021, in _execute
    raise AirflowException(err)
airflow.exceptions.AirflowException: ---------------------------------------------------

Here is output about tasks.

BackfillJob is deadlocked. These tasks have succeeded:
set()
 These tasks have started:
{}
 These tasks have failed:
set()
 These tasks are skipped:
set()
 These tasks are deadlocked:
{<TaskInstance: example_bash_operator.runme_0 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:44:00 [scheduled]>}

tested with python 2.7 and python 3.5

used SequentialExecutor and LocalExecutor

PS. if I backfill the DAG at current time, it executes for once, and then throws the above error for all the scheduled tasks.

like image 361
Mubin Avatar asked Apr 13 '17 14:04

Mubin


People also ask

How do I know if my Airflow scheduler is working?

CLI Check for Scheduler BaseJob with information about the host and timestamp (heartbeat) at startup, and then updates it regularly. You can use this to check if the scheduler is working correctly. To do this, you can use the airflow jobs checks command. On failure, the command will exit with a non-zero error code.

How do you schedule an Airflow job?

To start the airflow job scheduler you need to execute the Airflow Scheduler command. It will use the configuration specified in airflow. cfg. The Airflow Jobs Scheduler runs jobs with schedule_interval AFTER the start date, at the END of the period.

How does Apache Airflow scheduler work?

The Airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) inspects active tasks to see whether they can be triggered.

Is Start_date mandatory in Airflow DAG?

When creating a new DAG, you probably want to set a global start_date for your tasks. This can be done by declaring your start_date directly in the DAG() object. The first DagRun to be created will be based on the min(start_date) for all your tasks.


1 Answers

Your airflow instance is in deadlock state. The task which has failed is not allowing future runs of the task.

Airflow launches each task in each dag run as a new process and when the task falters and this is not handled deadlock situation arises

To resolve this situation you can do one of the following:

  1. use airflow clear <<dag_id>> This will resolve the deadlock and allow future runs of the DAG/task
  2. If above does not solve the issue, you would need to use airflow resetdb This would clear the airflow database and hence resolve the issue

In future,

  • try and use execution_timeout=timedelta(minutes=2) set some timeout so that you have explicit control on operator
  • Also, do provide a on_failure_callback=handle_failure which would cleanly exist the operator on failure

Hope this helps,

Cheers!

like image 57
Priyank Mehta Avatar answered Sep 23 '22 10:09

Priyank Mehta