Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow 2.0 - Scheduler is unable to find serialized DAG in the serialized_dag table

I have 2 files inside dags directory - dag_1.py and dag_2.py

dag_1.py creates a static DAG and dag_2.py creates dynamic DAGs based on external json files at some location.

The static DAG (created by dag_1.py) contains a task at a later stage which generates some of these input json files for dag_2.py and dynamic DAGs are created in this manner.

This used to work fine with Airflow 1.x versions where DAG Serialization was not used. But with Airflow 2.0 DAG Serialization has become mandatory. Sometimes, I get the following exception in the Scheduler when dynamic DAGs are spawned -

[2021-01-02 06:17:39,493] {scheduler_job.py:1293} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
    self._run_scheduler_loop()
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling
    self._create_dag_runs(query.all(), session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs
    dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in serialized_dag table

After this the scheduler gets terminated which is expected. When I check the table manually after this error, I am able to see the DAG entry in it.

This issue is not reproducible all the time. What can be the probable cause for this? Is there any Airflow configuration which I should try tweaking?

like image 983
arch DJ Avatar asked Dec 22 '22 16:12

arch DJ


2 Answers

We had the same issue after updating in the following order:

  1. 1.10.12 -> 1.10.14
  2. 1.10.14 -> 2.0.0

I've followed their guide through, and we had no issues until at some random point after a few hours scheduler started crashing complaining about random DAGs not being found in the database.

Our deployment procedure involves clearing out /opt/airflow/dags folder and doing a clean install every time (we store dags and supporting code in python packages)

So every now and then on 1.10.x version we had cases when scheduler parsed an empty folder and wiped serialized dags from the database, but it always was able to restore the picture on next parse

Apparently in 2.0, as a part of the effort to make scheduler HA, they fully separated DAG processor and scheduler. Which leads to a race condition:

  • if scheduler job hits a database before DAG processor has updated serialized_dag table values, it finds nothing and crashes
  • if luck is on your side, the above will not happen and you won't see this exception

In order to get rid of this problem, I disabled scheduling of all DAGs by updating is_paused in the database, restarted the scheduler and once it generated serialized dags, turned all dags back ON

like image 97
George Avatar answered Dec 25 '22 23:12

George


I fixed this issue in https://github.com/apache/airflow/pull/13893 which will be released as part for Airflow 2.0.1.

Will release Airflow 2.0.1 next week (8 Feb 2021 - most likely).

like image 23
kaxil Avatar answered Dec 25 '22 22:12

kaxil