I have a series of Python tasks inside a folder of python files: file1.py, file2.py, ...
I read the Airflow docs, but I don't see how to specify the folder and filename of the python files in the DAG?
I would like to execute those python files (not the Python function through Python Operator).
Task1: Execute file1.py (with some import package)
Task2: Execute file2.py (with some other import package)
It would be helpful. Thanks, regards
I know you're asking that you "would like to execute those python files (not the Python function through Python Operator)." but I see this as probably using Airflow less effectively than you could be. I also see confusion in the previously written answers so here's the way you wanted, and the way I'd recommend to do the tasks:
Assuming:
dags/ my_dag_for_task_1_and_2.py tasks/ file1.py file2.py
Your request to avoid the PythonOperator
:
# my_dag_for_task_1_and_2.py import datetime as dt from airflow import DAG from airflow.operators import BashOperator with DAG( 'my_dag_for_task_1_and_2', default_args={ 'owner': 'me', 'start_date': datetime(…), …, }, schedule_interval='8 * * * *', ) as dag: task_1 = BashOperator( task_id='task_1', bash_command='/path/to/python /path/to/dags/tasks/file1.py', ) task_2 = BashOperator( task_id='task_2', bash_command='/path/to/python /path/to/dags/tasks/file2.py', ) task_1 >> task_2
You didn't write the Python from scratch for Airflow, but with PythonOperator
:
# my_dag_for_task_1_and_2.py import datetime as dt from airflow import DAG from airflow.operators import PythonOperator import tasks.file1 import tasks.file2 with DAG( 'my_dag_for_task_1_and_2', default_args={ 'owner': 'me', 'start_date': datetime(…), …, }, schedule_interval='8 * * * *', ) as dag: task_1 = PythonOperator( task_id='task_1', python_callable=file1.function_in_file1, ) task_2 = PythonOperator( task_id='task_2', python_callable=file2.function_in_file2, # maybe main? ) task_1 >> task_2
To execute the python file as a whole, using the BashOperator
(As in liferacer's answer):
from airflow.operators.bash_operator import BashOperator bash_task = BashOperator( task_id='bash_task', bash_command='python file1.py', dag=dag )
Then, to do it using the PythonOperator
call your main
function. You should already have a __main__
block, so put what happens in there into a main
function, such that your file1.py
looks like so:
def main(): """This gets executed if `python file1` gets called.""" # my code if __name__ == '__main__': main()
Then your dag definition:
from airflow.operators.python_operator import PythonOperator import file1 python_task = PythonOperator( task_id='python_task', python_callable=file1.main, dag=dag )
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With