I'm trying to create a dynamic airflow that has the following 2 tasks: Task 1: Creates files with a generated UUID as part of their name Task 2: Runs a check on those files
So I define a variable 'FILE_UUID' and sets it as follow: str(uuid.uuid4()). And also created a constant file name: MY_FILE = '{file_uuid}_file.csv'.format(file_uuid=FILE_UUID}
Then - Task 1 is a bashOperator that get MY_FILE as part of the command, and it creates a file successfully. I can see the generated files include a specific UUID in the name,
TASK 2 fails is a PythonOperator that get MY_FILE as an op_args. But can't access the file. Logs show that it tries to access files with a different UUID.
Why is my "constant" is being run separately on every task? Is there any way to prevent that from happening?
I'm using Airflow 1.10, my executor is LocalExecutor.
I tried setting the constant outside the "with DAG" and inside it, also tries working with macros, but then PythonOperator just uses the macro strings literally using the values they hold.
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field.
When creating a new DAG, you probably want to set a global start_date for your tasks. This can be done by declaring your start_date directly in the DAG() object. The first DagRun to be created will be based on the min(start_date) for all your tasks.
The TriggerDagRunOperator is an easy way to implement cross-DAG dependencies. This operator allows you to have a task in one DAG that triggers another DAG in the same Airflow environment.
You have to keep in mind that the DAG definition file is a sort of "configuration script", not an actual executable to run your DAGs. The tasks are executed in completely different environments, most of the times not even on the same machine. Think of it like a configuration XML which sets up your tasks, and then they are built and executed on some other machine in the cloud - but it's Python instead of XML.
In conclusion - your DAG code is Python, but it's not the one being executed in the runtime of your tasks. So if you generate a random uuid there, it will get evaluated at an unknown time and multiple times - for each task, on different machines.
To have it consistent across tasks you need to find another way, for example:
Example DAG using the first method (XCOM's):
from datetime import datetime
import uuid
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
with DAG(dag_id='global_uuid',
schedule_interval='@daily',
start_date=...) as dag:
generate_uuid = PythonOperator(
task_id='generate_uuid',
python_callable=lambda: str(uuid.uuid4())
)
print_uuid1 = BashOperator(
task_id='print1',
bash_command='echo {{ task_instance.xcom_pull("generate_uuid") }}'
)
print_uuid2 = BashOperator(
task_id='print2',
bash_command='echo {{ task_instance.xcom_pull("generate_uuid") }}'
)
generate_uuid >> print_uuid1 >> print_uuid2
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With