I am trying to setup dynamic sequence etl jobs that will use XCOM to get data from the first task that runs. Here is the current code:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime as dt, timedelta as td, date
from airflow.models import BaseOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
START_DT = dt.combine(dt.today(), dt.min.time())
END_DT = dt.combine(dt.today(), dt.max.time())
NOW = dt.now()
CURRENT_EXEC = '{{ execution_date }}'
TODAY_MD = dt.today().strftime("%m%d")
def datetime_range(start, end, delta):
"""Generates the date range with time separation"""
current = start
if not isinstance(delta, td):
delta = td(**delta)
while current < end:
yield current
current += delta
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': START_DT,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'queue': 'etl',
'retries': 1,
'retry_delay': td(minutes=1),
}
dag_name = 'SEQ_TEST_01'
dag = DAG(dag_id=dag_name, default_args=default_args, schedule_interval=td(minutes=30))
def seq_job(sq_dt, **kwargs):
for count, dt_in in enumerate(datetime_range(START_DT, END_DT, {'minutes':30}), 1):
if sq_dt < str(dt_in):
curr_seq = count, dt_in, dt_in + td(minutes=29, seconds=59)
sequence = int(curr_seq[0])
return sequence
pycall = PythonOperator(
task_id='seq_sensor',
provide_context=True,
python_callable=seq_job,
op_kwargs={'sq_dt': CURRENT_EXEC},
dag=dag)
def group(grp, **context):
sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
grp = '%0.2d' % grp
database = 'TEST'
today_date = '{{ ds_nodash }}'
return BashOperator(
task_id='ETL_GRP{}_{}_{}'.format(database, sequence, gap),
bash_command='script.sh {} {} {} {}'.format(today_date, sequence, database, grp),
dag=dag)
complete = DummyOperator(
task_id='All_Sequences_complete',
dag=dag)
pycall >> group(1) >> complete
pycall >> group(2) >> complete
pycall >> group(3) >> complete
Issue is that no matter what i try, i keep getting this error:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 263, in process_file
m = imp.load_source(mod_name, filepath)
File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 66, in <module>
pycall >> group(1) >> complete
File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 56, in group
sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
KeyError: 'task_instance'
Not sure if it's something small I am missing, or if I have everything wrong. Still new to airflow and trying to setup our ETL test env to run every 30 minutes with unique sequence number which is generated by datetime_range
and is based on on execution_date variable.
Sets the current execution context to the provided context object. This method should be called once per Task execution, before calling operator. execute. airflow.models.taskinstance.
XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines. An XCom is identified by a key (essentially its name), as well as the task_id and dag_id it came from.
Getting started on Airflow Xcom | 6 Examples Airflow Push and pull same ID from several operator. Push and pull from other Airflow Operator than pythonOperator. Push return code from bash operator to XCom. Pull between different DAGS.
Try to use context['ti']
instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With