Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: Set custom run_id for TriggerDagRunOperator

Tags:

airflow

When using TriggerDagRunOperator to trigger another DAG, it just gives a generic name like trig_timestamp:

enter image description here

Is it possible to give this run id a meaningful name so I can easily identify different dag runs?

like image 415
David Schuler Avatar asked Oct 04 '19 12:10

David Schuler


People also ask

What is TriggerDagRunOperator in Airflow?

TriggerDagRunOperator. The TriggerDagRunOperator is an easy way to implement cross-DAG dependencies from the upstream DAG. This operator allows you to have a task in one DAG that triggers another DAG in the same Airflow environment.

Is it possible to wait for the DAG triggered by the TriggerDagRunOperator to complete before executing the next task?

If you need to wait for the previous DAG to complete first, consider using ExternalTaskSensor instead of TriggerDagRunOperator. This operator will wait until the another DAG (or another DAG's task) is complete with a specified status (default is "success") until moving forward.

How do you pass parameters in Airflow?

You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field. Save this answer.


1 Answers

You can't immediately do this with the TriggerDagOperator as the "run_id" is generated inside it's execute method. However, you could implement your own operator, CustomTriggerDagOperator that would behave the way you want/need. For example:

class CustomTriggerDagOperator(TriggerDagOperator):
    def execute(self, context):
        if self.execution_date is not None:
            run_id = 'trig__{}'.format(self.execution_date)
            self.execution_date = timezone.parse(self.execution_date)
        else:
            run_id = 'trig__' + timezone.utcnow().isoformat()

        run_id += f'{self.trigger_dag_id}'

        dro = DagRunOrder(run_id=run_id)
        if self.python_callable is not None:
            dro = self.python_callable(context, dro)
        if dro:
            trigger_dag(dag_id=self.trigger_dag_id,
                        run_id=dro.run_id,
                        conf=json.dumps(dro.payload),
                        execution_date=self.execution_date,
                        replace_microseconds=False)
        else:
            self.log.info("Criteria not met, moving on")

This example above just appends the id of the triggered dag. You could use this same strategy to set the run_id arbitrarily.

like image 117
PirateNinjas Avatar answered Nov 15 '22 08:11

PirateNinjas