I mostly see Airflow being used for ETL/Bid data related jobs. I'm trying to use it for business workflows wherein a user action triggers a set of dependent tasks in future. Some of these tasks may need to be cleared (deleted) based on certain other user actions. I thought the best way to handle this would be via dynamic task ids. I read that Airflow supports dynamic dag ids. So, I created a simple python script that takes DAG id and task id as command line parameters. However, I'm running into problems making it work. It gives dag_id not found error. Has anyone tried this? Here's the code for the script (call it tmp.py) which I execute on command line as python (python tmp.py 820 2016-08-24T22:50:00 ):
from __future__ import print_function
import os
import sys
import shutil
from datetime import date, datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
execution = '2016-08-24T22:20:00'
if len(sys.argv) > 2 :
dagid = sys.argv[1]
taskid = 'Activate' + sys.argv[1]
execution = sys.argv[2]
else:
dagid = 'DAGObjectId'
taskid = 'Activate'
default_args = {'owner' : 'airflow', 'depends_on_past': False, 'start_date':date.today(), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1}
dag = DAG(dag_id = dagid,
default_args=default_args,
schedule_interval='@once',
)
globals()[dagid] = dag
task1 = BashOperator(
task_id = taskid,
bash_command='ls -l',
dag=dag)
fakeTask = BashOperator(
task_id = 'fakeTask',
bash_command='sleep 5',
retries = 3,
dag=dag)
task1.set_upstream(fakeTask)
airflowcmd = "airflow run " + dagid + " " + taskid + " " + execution
print("airflowcmd = " + airflowcmd)
os.system(airflowcmd)
Overview. In Airflow, DAGs are defined as Python code. Airflow executes all Python code in the dags_folder and loads any DAG objects that appear in globals() . The simplest way of creating a DAG is to write it as a static Python file. However, sometimes manually writing DAGs isn't practical.
Airflow's dynamic task mapping feature is built off of the MapReduce programming model. The map procedure takes a set of inputs and creates a single task for each one. The reduce procedure, which is optional, allows a task to operate on the collected output of a mapped task.
The unique identifier or DAG ID When you instantiate a DAG object you have to specify a DAG ID. The DAG ID must be unique across all of your DAGs. You should never have two DAGs with the same DAG ID otherwise only one DAG will show up and you might get unexpected behaviours.
In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.
After numerous trials and errors, I was able to figure this out. Hopefully, it will help someone. Here's how it works: You need to have an iterator or an external source (file/database table) to generate dags/task dynamically through a template. You can keep the dag and task names static, just assign them ids dynamically in order to differentiate one dag from the other. You put this python script in the dags folder. When you start the airflow scheduler, it runs through this script on every heartbeat and writes the DAGs to the dag table in the database. If a dag (unique dag id) has already been written, it will simply skip it. The scheduler also look at the schedule of individual DAGs to determine which one is ready for execution. If a DAG is ready for execution, it executes it and updates its status. Here's a sample code:
from airflow.operators import PythonOperator
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import sys
import time
dagid = 'DA' + str(int(time.time()))
taskid = 'TA' + str(int(time.time()))
input_file = '/home/directory/airflow/textfile_for_dagids_and_schedule'
def my_sleeping_function(random_base):
'''This is a function that will run within the DAG execution'''
time.sleep(random_base)
def_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(), 'email_on_failure': False,
'retries': 1, 'retry_delay': timedelta(minutes=2)
}
with open(input_file,'r') as f:
for line in f:
args = line.strip().split(',')
if len(args) < 6:
continue
dagid = 'DAA' + args[0]
taskid = 'TAA' + args[0]
yyyy = int(args[1])
mm = int(args[2])
dd = int(args[3])
hh = int(args[4])
mins = int(args[5])
ss = int(args[6])
dag = DAG(
dag_id=dagid, default_args=def_args,
schedule_interval='@once', start_date=datetime(yyyy,mm,dd,hh,mins,ss)
)
myBashTask = BashOperator(
task_id=taskid,
bash_command='python /home/directory/airflow/sendemail.py',
dag=dag)
task2id = taskid + '-X'
task_sleep = PythonOperator(
task_id=task2id,
python_callable=my_sleeping_function,
op_kwargs={'random_base': 10},
dag=dag)
task_sleep.set_upstream(myBashTask)
f.close()
From How can I create DAGs dynamically?:
Airflow looks in you [sic] DAGS_FOLDER for modules that contain DAG objects in their global namespace, and adds the objects it finds in the DagBag. Knowing this all we need is a way to dynamically assign variable in the global namespace, which is easily done in python using the globals() function for the standard library which behaves like a simple dictionary.
for i in range(10): dag_id = 'foo_{}'.format(i) globals()[dag_id] = DAG(dag_id) # or better, call a function that returns a DAG object!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With