Now, I create multiple tasks using a variable like this and it works fine.
with DAG(....) as dag:
body = Variable.get("config_table", deserialize_json=True)
for i in range(len(body.keys())):
simple_task = Operator(
task_id = 'task_' + str(i),
.....
But I need to use XCOM value for some reason instead of using a variable. Is it possible to dynamically create tasks with XCOM pull value?
I try to set value like this and it's not working
body = "{{ ti.xcom_pull(key='config_table', task_ids='get_config_table') }}"
It's possible to dynamically create tasks from XComs generated from a previous task, there are more extensive discussions on this topic, for example in this question. One of the suggested approaches follows this structure, here is a working example I made:
sample_file.json:
{
"cities": [ "London", "Paris", "BA", "NY" ]
}
XCom.
def _process_obtained_data(ti):
list_of_cities = ti.xcom_pull(task_ids='get_data')
Variable.set(key='list_of_cities',
value=list_of_cities['cities'], serialize_json=True)
def _read_file():
with open('dags/sample_file.json') as f:
data = json.load(f)
# push to XCom using return
return data
with DAG('dynamic_tasks_example', schedule_interval='@once',
start_date=days_ago(2),
catchup=False) as dag:
get_data = PythonOperator(
task_id='get_data',
python_callable=_read_file)
XCom and set a Variable with the data you will use to iterate later on. preparation_task = PythonOperator(
task_id='preparation_task',
python_callable=_process_obtained_data)
*Of course, if you want you can merge both tasks into one. I prefer not to because usually, I take a subset of the fetched data to create the Variable.
Variable and later iterate on it. It's critical to define default_var. end = DummyOperator(
task_id='end',
trigger_rule='none_failed')
# Top-level code within DAG block
iterable_list = Variable.get('list_of_cities',
default_var=['default_city'],
deserialize_json=True)
task_id uniques. TaskGroup is optional, helps you sorting the UI.
with TaskGroup('dynamic_tasks_group',
prefix_group_id=False,
) as dynamic_tasks_group:
if iterable_list:
for index, city in enumerate(iterable_list):
say_hello = PythonOperator(
task_id=f'say_hello_from_{city}',
python_callable=_print_greeting,
op_kwargs={'city_name': city, 'greeting': 'Hello'}
)
say_goodbye = PythonOperator(
task_id=f'say_goodbye_from_{city}',
python_callable=_print_greeting,
op_kwargs={'city_name': city, 'greeting': 'Goodbye'}
)
# TaskGroup level dependencies
say_hello >> say_goodbye
# DAG level dependencies
get_data >> preparation_task >> dynamic_tasks_group >> end
DAG Graph View:

Imports:
import json
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
Things to keep in mind:
DAG, all of them will use the same variable, so you may need to make it 'unique' by differentiating their names.Variable, otherwise, the first execution may not be processable to the Scheduler.Good luck!
Edit:
Variable.get() method is top-level code, so is read by the scheduler every 30 seconds (default of min_file_process_interval setting). This means that a connection to the metadata DB will happen each time.Edit:
iterable_list case.EDIT: Starting from Airflow 2 it's much easier to achieve, see this answer. do note that dynamic tasks can still cause the issues I mentioned below.
This is not possible, and in general dynamic tasks are not recommended:
What you can do is use branch operator, to have those tasks always and just skip them based on the xcom value. For example:
def branch_func(**context)
return f"task_{context['ti'].xcom_pull(key=key)}"
branch = BranchPythonOperator(
task_id="branch",
python_callback=branch_func
)
tasks = [BaseOperator(task_id=f"task_{i}") for i in range(3)]
branch >> tasks
In some cases it's also not good to use this method (for example when I've 100 possible tasks), in those cases I'd recommend writing your own operator or use a single PythonOperator.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With