If I have multiple airflow dags with some overlapping python package dependencies, how can I keep each of these project deps. decoupled? Eg. if I had project A and B on same server I would run each of them with something like...
source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate
Basically, would like to run dags with the same situation (eg. each dag uses python scripts that have may have overlapping package deps. that I would like to develop separately (ie. not have to update all code using a package when want to update the package just for one project)). Note, I've been using the BashOperator
to run python tasks like...
do_stuff = BashOperator(
task_id='my_task',
bash_command='python /path/to/script.py'),
execution_timeout=timedelta(minutes=30),
dag=dag)
Is there a way to get this working? IS there some other best-practice way that airflow intendeds for people to address (or avoid) these kinds of problems?
By default, Airflow uses SequentialExecutor which would execute task sequentially no matter what. So to allow Airflow to run tasks in Parallel you will need to create a database in Postges or MySQL and configure it in airflow. cfg ( sql_alchemy_conn param) and then change your executor to LocalExecutor in airflow.
Basic dependencies between Airflow tasks can be set in the following ways: Using bitshift operators ( << and >> ) Using the set_upstream and set_downstream methods.
You can do it in one of those ways: add your modules to one of the folders that Airflow automatically adds to PYTHONPATH. add extra folders where you keep your code to PYTHONPATH. package your code into a Python package and install it together with Airflow.
Based on discussion from the apache-airflow mailing list, the simplest answer that addresses the modular way in which I am using various python scripts for tasks is to directly call virtualenv python interpreter binaries for each script or module, eg.
source /path/to/virtualenv_a/activate
python script_a.py
deactivate
source /path/to/virtualenv_b/activate
python script_b.py
deactivate
would translate to something like
do_stuff_a = BashOperator(
task_id='my_task_a',
bash_command='/path/to/virtualenv_a/bin/python /path/to/script_a.py'),
execution_timeout=timedelta(minutes=30),
dag=dag)
do_stuff_b = BashOperator(
task_id='my_task_b',
bash_command='/path/to/virtualenv_b/bin/python /path/to/script_b.py'),
execution_timeout=timedelta(minutes=30),
dag=dag)
in an airflow dag.
To the question of passing args to the Tasks, it depends on the nature of the args you want to pass in. In my case, there are certain args that depend on what a data table looks like on the day the dag is run (eg. highest timestamp record in the table, etc.). To add these args to the Tasks, I have a "congif dag" that runs before this one. In the config dag, there is a Task that generates the args for the "real" dag as a python dict and converts to a pickle file. Then the "config" dag has a Task that is a TriggerDagRunOperator
that activates the "real" dag which has initial logic to read from the pickle file generated by the "config" dag (in my case, as a Dict
) and I read it into that bash_command
string like bash_command=f"python script.py {configs['arg1']}"
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With