I have a simple job that I'd like to move under an Airflow process, if possible. As it stands, I have a string of bash scripts that access a server and download the latest version of a file and then perform a variety of downstream manipulations to that file.
exec ./somescript.sh somefileurl
What I'd like to know is: how can I pass in the URL to this file every time I need to run this process?
It seems that if I try to run the bash script as a bash command like so:
download = BashOperator(
task_id='download_release',
bash_command='somescript.sh',
# params={'URL': 'somefileurl'},
dag=dag)
I have no way of passing in the one parameter that the bash script requires. Otherwise, if I try to send the bash script in as a bash command like so:
download = BashOperator(
task_id='download_release',
bash_command='./somescript.sh {{ URL }}',
params={'URL': 'somefileurl'},
dag=dag)
I receive an execution error as the program tries to execute the script in the context of a temporary directory. This breaks the script as it requires access to some credentials files that sit in the same directory and I'd like to keep the relative file locations intact...
Thoughts?
download = BashOperator(
task_id='download_release',
bash_command='cd {{ params.dir }} && ./somescript.sh {{ params.url }}',
params={'url': 'somefileurl',
'dir': 'somedir'},
dag=dag)
I did not implement any parameter passing yet, though.
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field.
In task_1 you can download data from table_1 in some dataframe, process it and save in another table_2 (df. to_sql()). b. Pass the name of the table using xcom.
The start_date Airflow starts running tasks for a given interval at the end of the interval itself, so it will not start its first run until after 11:59 pm on 01-01-2022 or midnight on the following day (2nd Jan 2022).
Here is an example of passing a parameter to your BashOperator:
templated_command = """
cd /working_directory
somescript.sh {{ dag_run.conf['URL'] }}
"""
download = BashOperator(
task_id='download_release',
bash_command=templated_command,
dag=dag)
For a discussion about this see passing parameters to externally trigged dag. Airflow has two example DAG's that demonstrate this: example_trigger_controller_dag and example_trigger_target_dag. Also, see the Airflow api reference on macros.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With