Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run Airflow PythonOperator in a virtual environment

I have several python files that I'm currently executing using BashOperator. This allows me the flexibility to choose the python virtual environment easily.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
   'owner': 'airflow',
    'depends_on_past': False,
    ...}

dag = DAG('python_tasks', default_args=default_args, schedule_interval="23 4 * * *")

t1 = BashOperator(
                 task_id='task1',
                bash_command='~/anaconda3/envs/myenv/bin/python 
                              /python_files/python_task1.py',
                 dag=dag)

How can I achieve the same using PythonOperator to something like this?

from airflow.operators.bash_operator import PythonOperator
import python_files.python_task1

python_task = PythonOperator(
              task_id='python_task',
              python_callable=python_task1.main,
             dag=dag)

I assume PythonOperator will use the system python environment. I've found that Airflow has the PythonVirtualenvOperator, but this appears to work by creating a new virtual env on the fly using the specified requirements. I'd prefer to use an existing one that is already properly configured. How can I run PythonOperator with a specified python path?

like image 900
khuang834 Avatar asked Apr 09 '18 17:04

khuang834


People also ask

What is Op_kwargs in Airflow?

op_kwargs (dict) – A dict of keyword arguments to pass to python_callable. provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.


1 Answers

First things first: you should not (in general) rely on pre-existing resources for your Operators. You operators should be portable, so using longstanding virtualenvs is somewhat against that principle. That being said, it's not as much of a big deal, just like you have to preinstall packages to the global environment you can pre-bake a few environments. Or, you can let the Operator create the environment and subsequent operators may reuse it - which is, I believe, the easiest and most dangerous approach.

Implementing a "virtualenv cache" shouldn't be difficult. Reading the implementation of PythonVirtualenvOperator's execution method:

def execute_callable(self):
    with TemporaryDirectory(prefix='venv') as tmp_dir:
        ...
        self._execute_in_subprocess(
            self._generate_python_cmd(tmp_dir,
                                      script_filename,
                                      input_filename,
                                      output_filename,
                                      string_args_filename))
        return self._read_result(output_filename)

So it looks like it doesn't delete the virtualenv explicitly (it relies on TemporaryDirectory to do that). You can subclass PythonVirtualenvOperator and simply use your own context manager that reuses temporary directories:

import glob

@contextmanager
def ReusableTemporaryDirectory(prefix):
    try:
        existing = glob.glob('/tmp/' + prefix + '*')
        if len(existing):
            name = existing[0]
        else:
            name = mkdtemp(prefix=prefix)
        yield name
    finally:
        # simply don't delete the tmp dir
        pass

def execute_callable(self):
    with ReusableTemporaryDirectory(prefix='cached-venv') as tmp_dir:
        ...

Naturally, you can get rid of the try-finally in ReusableTemporaryDirectory and put back the usual suffix and dir arguments, I made minimal changes to make it easy to compare with the original TemporaryDirectory class.

With this, your virtualenv won't be discarded but newer dependencies will be eventually installed by the Operator.

like image 129
villasv Avatar answered Oct 05 '22 13:10

villasv