I have several python files that I'm currently executing using BashOperator. This allows me the flexibility to choose the python virtual environment easily.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
...}
dag = DAG('python_tasks', default_args=default_args, schedule_interval="23 4 * * *")
t1 = BashOperator(
task_id='task1',
bash_command='~/anaconda3/envs/myenv/bin/python
/python_files/python_task1.py',
dag=dag)
How can I achieve the same using PythonOperator to something like this?
from airflow.operators.bash_operator import PythonOperator
import python_files.python_task1
python_task = PythonOperator(
task_id='python_task',
python_callable=python_task1.main,
dag=dag)
I assume PythonOperator will use the system python environment. I've found that Airflow has the PythonVirtualenvOperator, but this appears to work by creating a new virtual env on the fly using the specified requirements. I'd prefer to use an existing one that is already properly configured. How can I run PythonOperator with a specified python path?
op_kwargs (dict) – A dict of keyword arguments to pass to python_callable. provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.
First things first: you should not (in general) rely on pre-existing resources for your Operators. You operators should be portable, so using longstanding virtualenvs is somewhat against that principle. That being said, it's not as much of a big deal, just like you have to preinstall packages to the global environment you can pre-bake a few environments. Or, you can let the Operator create the environment and subsequent operators may reuse it - which is, I believe, the easiest and most dangerous approach.
Implementing a "virtualenv cache" shouldn't be difficult. Reading the implementation of PythonVirtualenvOperator
's execution method:
def execute_callable(self):
with TemporaryDirectory(prefix='venv') as tmp_dir:
...
self._execute_in_subprocess(
self._generate_python_cmd(tmp_dir,
script_filename,
input_filename,
output_filename,
string_args_filename))
return self._read_result(output_filename)
So it looks like it doesn't delete the virtualenv explicitly (it relies on TemporaryDirectory
to do that). You can subclass PythonVirtualenvOperator
and simply use your own context manager that reuses temporary directories:
import glob
@contextmanager
def ReusableTemporaryDirectory(prefix):
try:
existing = glob.glob('/tmp/' + prefix + '*')
if len(existing):
name = existing[0]
else:
name = mkdtemp(prefix=prefix)
yield name
finally:
# simply don't delete the tmp dir
pass
def execute_callable(self):
with ReusableTemporaryDirectory(prefix='cached-venv') as tmp_dir:
...
Naturally, you can get rid of the try-finally
in ReusableTemporaryDirectory
and put back the usual suffix
and dir
arguments, I made minimal changes to make it easy to compare with the original TemporaryDirectory
class.
With this, your virtualenv won't be discarded but newer dependencies will be eventually installed by the Operator.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With