I have 2 files inside dags directory - dag_1.py and dag_2.py
dag_1.py creates a static DAG and dag_2.py creates dynamic DAGs based on external json files at some location.
The static DAG (created by dag_1.py) contains a task at a later stage which generates some of these input json files for dag_2.py and dynamic DAGs are created in this manner.
This used to work fine with Airflow 1.x versions where DAG Serialization was not used. But with Airflow 2.0 DAG Serialization has become mandatory. Sometimes, I get the following exception in the Scheduler when dynamic DAGs are spawned -
[2021-01-02 06:17:39,493] {scheduler_job.py:1293} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
self._run_scheduler_loop()
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling
self._create_dag_runs(query.all(), session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag
self._add_dag_from_db(dag_id=dag_id, session=session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in serialized_dag table
After this the scheduler gets terminated which is expected. When I check the table manually after this error, I am able to see the DAG entry in it.
This issue is not reproducible all the time. What can be the probable cause for this? Is there any Airflow configuration which I should try tweaking?
We had the same issue after updating in the following order:
I've followed their guide through, and we had no issues until at some random point after a few hours scheduler started crashing complaining about random DAGs not being found in the database.
Our deployment procedure involves clearing out /opt/airflow/dags
folder and doing a clean install every time (we store dags and supporting code in python packages)
So every now and then on 1.10.x version we had cases when scheduler parsed an empty folder and wiped serialized dags from the database, but it always was able to restore the picture on next parse
Apparently in 2.0, as a part of the effort to make scheduler HA, they fully separated DAG processor and scheduler. Which leads to a race condition:
In order to get rid of this problem, I disabled scheduling of all DAGs by updating is_paused
in the database, restarted the scheduler and once it generated serialized dags, turned all dags back ON
I fixed this issue in https://github.com/apache/airflow/pull/13893 which will be released as part for Airflow 2.0.1.
Will release Airflow 2.0.1 next week (8 Feb 2021 - most likely).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With