I use Airflow to manage ETL tasks execution and schedule. A DAG has been created and it works fine. But is it possible to pass parameters when manually trigger the dag via cli.
For example: My DAG runs every day at 01:30, and processes data for yesterday(time range from 01:30 yesterday to 01:30 today). There might be some issues with the data source. I need to re-process those data (manually specify the time range).
So can I create such an airflow DAG, when it's scheduled, that the default time range is from 01:30 yesterday to 01:30 today. Then if anything wrong with the data source, I need to manually trigger the DAG and manually pass the time range as parameters.
As I know airflow test
has -tp
that can pass params to the task. But this is only for testing a specific task. and airflow trigger_dag
doesn't have -tp
option. So is there any way to tigger_dag and pass parameters to the DAG, and then the Operator can read these parameters?
Thanks!
In order to use the SimpleHttpOperator to trigger another DAG, you need to define the following: endpoint : This should be of the form '/api/v1/dags/<dag-id>/dagRuns' where <dag-id> is the ID of the DAG you want to trigger. data : To trigger a DAG Run using this endpoint, you must provide an execution date.
You can pass parameters from the CLI using --conf '{"key":"value"}'
and then use it in the DAG file as "{{ dag_run.conf["key"] }}"
in templated field.
CLI:
airflow trigger_dag 'example_dag_conf' -r 'run_id' --conf '{"message":"value"}'
DAG File:
args = {
'start_date': datetime.utcnow(),
'owner': 'airflow',
}
dag = DAG(
dag_id='example_dag_conf',
default_args=args,
schedule_interval=None,
)
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag,
)
# You can also access the DagRun object in templates
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: '
'{{ dag_run.conf["message"] if dag_run else "" }}" ',
dag=dag,
)
This should work, as per the airflow documentation: https://airflow.apache.org/cli.html#trigger_dag
airflow trigger_dag -c '{"key1":1, "key2":2}' dag_id
Make sure the value of -c
is a valid json string, so the double quotes wrapping the keys are necessary here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With