Airflow packaged DAGs seem like a great building block for a sane production airflow deployment.
I have a DAG with dynamic subDAGs, driven by a config file, something like:
config.yaml:
imports:
- project_foo
- project_bar`
which yields subdag tasks like imports.project_{foo|bar}.step{1|2|3}
.
I've normally read in the config file using python's open
function, a la config = open(os.path.join(os.path.split(__file__)[0], 'config.yaml')
Unfortunately, when using packaged DAGs, this results in an error:
Broken DAG: [/home/airflow/dags/workflows.zip] [Errno 20] Not a directory: '/home/airflow/dags/workflows.zip/config.yaml'
Any thoughts / best practices to recommend here?
It's a bit of a kludge, but I eventually just fell back on reading zip file contents via ZipFile
.
import yaml
from zipfile import ZipFile
import logging
import re
def get_config(yaml_filename):
"""Parses and returns the given YAML config file.
For packaged DAGs, gracefully handles unzipping.
"""
zip, post_zip = re.search(r'(.*\.zip)?(.*)', yaml_filename).groups()
if zip:
contents = ZipFile(zip).read(post_zip.lstrip('/'))
else:
contents = open(post_zip).read()
result = yaml.safe_load(contents)
logging.info('Parsed config: %s', result)
return result
which works as you'd expect from the main dag.py
:
get_config(os.path.join(path.split(__file__)[0], 'config.yaml'))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With