I have a bash script that creates a file (if it does not exist) that I want to run in Airflow, but when I try it fails. How do I do this?
#!/bin/bash #create_file.sh file=filename.txt if [ ! -e "$file" ] ; then touch "$file" fi if [ ! -w "$file" ] ; then echo cannot write to $file exit 1 fi
and here's how I'm calling it in Airflow:
create_command = """ ./scripts/create_file.sh """ t1 = BashOperator( task_id= 'create_file', bash_command=create_command, dag=dag ) lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 83, in execute raise AirflowException("Bash command failed") airflow.exceptions.AirflowException: Bash command failed
The Airflow BashOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute either a bash script, a command or a set of commands from your DAGs.
Bash has a large set of logical operators that can be used in conditional expressions. The most basic form of the if control structure tests for a condition and then executes a list of program statements if the condition is true. There are three types of operators: file, numeric, and non-numeric operators.
Now to schedule Python scripts with Apache Airflow, open up the dags folder where your Airflow is installed or create a folder called “dags” in there. This will be the place where all your dags, or, python scripts will be. Once you have it, create a file in there ending with a .
class airflow.operators.dummy. DummyOperator(**kwargs)[source] Operator that does literally nothing. It can be used to group tasks in a DAG. The task is evaluated by the scheduler but never processed by the executor.
From the tutorial this is OK:
t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag)
But you're passing a multi-line command to it
create_command = """ ./scripts/create_file.sh """
should be
create_command = "./scripts/create_file.sh "
Moreover, you also have to make sure that you are in the correct directory to avoid cryptic errors. Do it like this for example:
create_command = "./scripts/create_file.sh " if os.path.exists(create_command): t1 = BashOperator( task_id= 'create_file', bash_command=create_command, dag=dag ) else: raise Exception("Cannot locate {}".format(create_command))
From the documentation:
t2 = BashOperator( task_id='bash_example', # "scripts" folder is under "/usr/local/airflow/dags" bash_command="scripts/test.sh", dag=dag)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With