Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Generating uuid and use it across Airflow DAG

I'm trying to create a dynamic airflow that has the following 2 tasks: Task 1: Creates files with a generated UUID as part of their name Task 2: Runs a check on those files

So I define a variable 'FILE_UUID' and sets it as follow: str(uuid.uuid4()). And also created a constant file name: MY_FILE = '{file_uuid}_file.csv'.format(file_uuid=FILE_UUID}

Then - Task 1 is a bashOperator that get MY_FILE as part of the command, and it creates a file successfully. I can see the generated files include a specific UUID in the name,

TASK 2 fails is a PythonOperator that get MY_FILE as an op_args. But can't access the file. Logs show that it tries to access files with a different UUID.

Why is my "constant" is being run separately on every task? Is there any way to prevent that from happening?

I'm using Airflow 1.10, my executor is LocalExecutor.

I tried setting the constant outside the "with DAG" and inside it, also tries working with macros, but then PythonOperator just uses the macro strings literally using the values they hold.

like image 931
JustinCase Avatar asked Apr 18 '19 14:04

JustinCase


People also ask

How do you pass variables to Airflow DAG?

You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field.

Is Start_date mandatory in Airflow DAG?

When creating a new DAG, you probably want to set a global start_date for your tasks. This can be done by declaring your start_date directly in the DAG() object. The first DagRun to be created will be based on the min(start_date) for all your tasks.

Can a DAG trigger another DAG Airflow?

The TriggerDagRunOperator is an easy way to implement cross-DAG dependencies. This operator allows you to have a task in one DAG that triggers another DAG in the same Airflow environment.


Video Answer


1 Answers

You have to keep in mind that the DAG definition file is a sort of "configuration script", not an actual executable to run your DAGs. The tasks are executed in completely different environments, most of the times not even on the same machine. Think of it like a configuration XML which sets up your tasks, and then they are built and executed on some other machine in the cloud - but it's Python instead of XML.

In conclusion - your DAG code is Python, but it's not the one being executed in the runtime of your tasks. So if you generate a random uuid there, it will get evaluated at an unknown time and multiple times - for each task, on different machines.

To have it consistent across tasks you need to find another way, for example:

  • use XCOM such that the first tasks uses the uuid it gets, and then writes that to XCOM for all downstream tasks to use.
  • anchor your uuid with something constant across your pipeline, a source, a date, or whatever (e.g. if it's a daily task, you can build your uuid from date parts mixing in some dag/task specifics, etc. - whatever will make your uuid the same for all tasks, but unique for unique days)

Example DAG using the first method (XCOM's):

from datetime import datetime
import uuid

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

with DAG(dag_id='global_uuid',
         schedule_interval='@daily',
         start_date=...) as dag:

    generate_uuid = PythonOperator(
        task_id='generate_uuid',
        python_callable=lambda: str(uuid.uuid4())
    )

    print_uuid1 = BashOperator(
        task_id='print1',
        bash_command='echo {{ task_instance.xcom_pull("generate_uuid") }}'
    )

    print_uuid2 = BashOperator(
        task_id='print2',
        bash_command='echo {{ task_instance.xcom_pull("generate_uuid") }}'
    )

    generate_uuid >> print_uuid1 >> print_uuid2

like image 168
bosnjak Avatar answered Sep 28 '22 14:09

bosnjak