Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use AirFlow to run a folder of python files?

Tags:

python

airflow

I have a series of Python tasks inside a folder of python files: file1.py, file2.py, ...

I read the Airflow docs, but I don't see how to specify the folder and filename of the python files in the DAG?

I would like to execute those python files (not the Python function through Python Operator).

Task1: Execute file1.py (with some import package)

Task2: Execute file2.py (with some other import package)

It would be helpful. Thanks, regards

like image 618
tensor Avatar asked Sep 21 '16 07:09

tensor


2 Answers

I know you're asking that you "would like to execute those python files (not the Python function through Python Operator)." but I see this as probably using Airflow less effectively than you could be. I also see confusion in the previously written answers so here's the way you wanted, and the way I'd recommend to do the tasks:

Assuming:

dags/     my_dag_for_task_1_and_2.py     tasks/          file1.py          file2.py 

Your request to avoid the PythonOperator:

#  my_dag_for_task_1_and_2.py import datetime as dt from airflow import DAG from airflow.operators import BashOperator  with DAG(     'my_dag_for_task_1_and_2',     default_args={         'owner': 'me',         'start_date': datetime(…),         …,     },      schedule_interval='8 * * * *', ) as dag:     task_1 = BashOperator(         task_id='task_1',          bash_command='/path/to/python /path/to/dags/tasks/file1.py',     )     task_2 = BashOperator(         task_id='task_2',          bash_command='/path/to/python /path/to/dags/tasks/file2.py',     )     task_1 >> task_2 

You didn't write the Python from scratch for Airflow, but with PythonOperator:

#  my_dag_for_task_1_and_2.py import datetime as dt from airflow import DAG from airflow.operators import PythonOperator import tasks.file1 import tasks.file2  with DAG(     'my_dag_for_task_1_and_2',     default_args={         'owner': 'me',         'start_date': datetime(…),         …,     },      schedule_interval='8 * * * *', ) as dag:     task_1 = PythonOperator(         task_id='task_1',          python_callable=file1.function_in_file1,     )     task_2 = PythonOperator(         task_id='task_2',          python_callable=file2.function_in_file2,  # maybe main?     )     task_1 >> task_2 
like image 85
dlamblin Avatar answered Sep 22 '22 17:09

dlamblin


To execute the python file as a whole, using the BashOperator (As in liferacer's answer):

from airflow.operators.bash_operator import BashOperator  bash_task = BashOperator(     task_id='bash_task',     bash_command='python file1.py',     dag=dag ) 

Then, to do it using the PythonOperator call your main function. You should already have a __main__ block, so put what happens in there into a main function, such that your file1.py looks like so:

def main():     """This gets executed if `python file1` gets called."""     # my code  if __name__ == '__main__':     main()  

Then your dag definition:

from airflow.operators.python_operator import PythonOperator  import file1  python_task = PythonOperator(     task_id='python_task',     python_callable=file1.main,     dag=dag ) 
like image 33
Roman Avatar answered Sep 21 '22 17:09

Roman