I am reading list of elements from an external file and looping over elements to create a series of tasks.
For example, if there are 2 elements in the file - [A, B]. There will be 2 series of tasks:
A1 -> A2 ..
B1 -> B2 ...
This reading elements logic is not part of any task but in the DAG itself. Thus Scheduler is calling it many times a day while reading the DAG file. I want to call it only during DAG runtime.
Wondering if there is already a pattern for such kind of use cases?
Airflow's dynamic task mapping feature is built off of the MapReduce programming model. The map procedure takes a set of inputs and creates a single task for each one. The reduce procedure, which is optional, allows a task to operate on the collected output of a mapped task.
Depending on your requirements, if what you are looking for is to avoid reading a file many times, but you don't mind reading from the metadata database as many times instead, then you could change your approach to use Variables
as the source of iteration to dynamically create tasks.
A basic example could be performing the file reading inside a PythonOperator
and set the Variables
you will use to iterate later on (same callable):
sample_file.json:
{
"cities": [ "London", "Paris", "BA", "NY" ]
}
Task definition:
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import json
def _read_file():
with open('dags/sample_file.json') as f:
data = json.load(f)
Variable.set(key='list_of_cities',
value=data['cities'], serialize_json=True)
print('Loading Variable from file...')
def _say_hello(city_name):
print('hello from ' + city_name)
with DAG('dynamic_tasks_from_var', schedule_interval='@once',
start_date=days_ago(2),
catchup=False) as dag:
read_file = PythonOperator(
task_id='read_file',
python_callable=_read_file
)
Then you could read from that variable and create the dynamic tasks. (It's important to set a default_var
). The TaskGroup
is optional.
# Top-level code
updated_list = Variable.get('list_of_cities',
default_var=['default_city'],
deserialize_json=True)
print(f'Updated LIST: {updated_list}')
with TaskGroup('dynamic_tasks_group',
prefix_group_id=False,
) as dynamic_tasks_group:
for index, city in enumerate(updated_list):
say_hello = PythonOperator(
task_id=f'say_hello_from_{city}',
python_callable=_say_hello,
op_kwargs={'city_name': city}
)
# DAG level dependencies
read_file >> dynamic_tasks_group
In the Scheduler logs, you will only find:
INFO - Updated LIST: ['London', 'Paris', 'BA', 'NY']
Dag Graph View:
With this approach, the top-level code, hence read by the Scheduler continuously, is the call to Variable.get()
method. If you need to read from many variables, it's important to remember that it's recommended to store them in one single JSON value to avoid constantly create connections to the metadata database (example in this article).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With