Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I trigger Airflow -dag using TriggerDagRunOperator

I have found following link:

https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

which indeed explains how to use TriggerDagRunOperator to execute separate Airflow dag. The documentation uses Airflow's own example dags, but I have a hard time understanding those as they are not using any sensors.

Can somebody explain how do I start separate dag using TriggerDagRunOperator and SqlSensor? I'm trying to start separate DAG when my SQL Server job task is finished. I know how to check the status of the SQL Server job by using SqlSensor, but I don't know how to attach the result into TriggerDagRunOperator to start separate DAG.

I don't want to use the Airflow CLI or do the both tasks in one DAG. Basically, I want this to be trigger dag only.

Below is my current code, which is missing the crucial conditionally_trigger

# File Name: check-when-db1-sql-task-is-done

from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators import SqlSensor
from datetime import datetime


default_args = {
        'owner': 'airflow',
        'retry_delay': timedelta(minutes=5),
}

dag = DAG('check-when-db1-sql-task-is-done',
        description='Check-when-DB1-SQL-task-is-done',
        default_args=default_args,
        schedule_interval='@once',
        start_date=datetime.now(),
        )

# returns-0-or-1-based-on-job-task-status
sqlsensor = SqlSensor (
        task_id='sql-sensor',
        poke_interval=30,
        timeout=3200,
        sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date = (select convert(varchar(24),getdate(),112)); """,    
        mssql_conn_id='db1',
        dag=dag,
        )

# dag-to-start
trigger = TriggerDagRunOperator (
        task_id='start-ssh-job',
        trigger_dag_id="qa-knime-ssh-task",
        python_callable=conditionally_trigger,
        params={'condition_param': True,
                'message': 'Hello World'},
        dag=dag)
like image 340
Mika Heino Avatar asked Aug 08 '17 12:08

Mika Heino


People also ask

Does airflow support streaming?

Airflow can stream full 4K HDR HEVC files to Chromecast Ultra, Built-in, Apple TV 4K and AirPlay 2 enabled TVs.

What can you do with Apache airflow?

Apache Airflow is an open-source tool to programmatically author, schedule, and monitor workflows. It is one of the most robust platforms used by Data Engineers for orchestrating workflows or pipelines. You can easily visualize your data pipelines' dependencies, progress, logs, code, trigger tasks, and success status.


1 Answers

My understanding is that TriggerDagRunOperator is for when you want to use a python function to determine whether or not to trigger the SubDag. That function is called conditionally_trigger in your code and the examples.

In your case you are using a sensor to control the flow and do not need to pass a function. You could use a SubDagOperator instead of TriggerDagRunOperator or pass a simple always-true function as the python_callable:

...
python_callable=lambda(context, dag_run_obj):dag_run_obj,
...
like image 102
7yl4r Avatar answered Oct 12 '22 13:10

7yl4r