I have found following link:
https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
which indeed explains how to use TriggerDagRunOperator
to execute separate Airflow dag. The documentation uses Airflow's own example dags, but I have a hard time understanding those as they are not using any sensors.
Can somebody explain how do I start separate dag using TriggerDagRunOperator
and SqlSensor
? I'm trying to start separate DAG when my SQL Server job task is finished. I know how to check the status of the SQL Server job by using SqlSensor
, but I don't know how to attach the result into TriggerDagRunOperator
to start separate DAG.
I don't want to use the Airflow CLI or do the both tasks in one DAG. Basically, I want this to be trigger dag only.
Below is my current code, which is missing the crucial conditionally_trigger
# File Name: check-when-db1-sql-task-is-done
from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators import SqlSensor
from datetime import datetime
default_args = {
'owner': 'airflow',
'retry_delay': timedelta(minutes=5),
}
dag = DAG('check-when-db1-sql-task-is-done',
description='Check-when-DB1-SQL-task-is-done',
default_args=default_args,
schedule_interval='@once',
start_date=datetime.now(),
)
# returns-0-or-1-based-on-job-task-status
sqlsensor = SqlSensor (
task_id='sql-sensor',
poke_interval=30,
timeout=3200,
sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date = (select convert(varchar(24),getdate(),112)); """,
mssql_conn_id='db1',
dag=dag,
)
# dag-to-start
trigger = TriggerDagRunOperator (
task_id='start-ssh-job',
trigger_dag_id="qa-knime-ssh-task",
python_callable=conditionally_trigger,
params={'condition_param': True,
'message': 'Hello World'},
dag=dag)
Airflow can stream full 4K HDR HEVC files to Chromecast Ultra, Built-in, Apple TV 4K and AirPlay 2 enabled TVs.
Apache Airflow is an open-source tool to programmatically author, schedule, and monitor workflows. It is one of the most robust platforms used by Data Engineers for orchestrating workflows or pipelines. You can easily visualize your data pipelines' dependencies, progress, logs, code, trigger tasks, and success status.
My understanding is that TriggerDagRunOperator
is for when you want to use a python function to determine whether or not to trigger the SubDag. That function is called conditionally_trigger
in your code and the examples.
In your case you are using a sensor to control the flow and do not need to pass a function. You could use a SubDagOperator
instead of TriggerDagRunOperator
or pass a simple always-true function as the python_callable
:
...
python_callable=lambda(context, dag_run_obj):dag_run_obj,
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With