Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow XCOM KeyError: 'task_instance'

Tags:

python

airflow

I am trying to setup dynamic sequence etl jobs that will use XCOM to get data from the first task that runs. Here is the current code:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime as dt, timedelta as td, date
from airflow.models import BaseOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable

START_DT = dt.combine(dt.today(), dt.min.time())
END_DT = dt.combine(dt.today(), dt.max.time())
NOW = dt.now()
CURRENT_EXEC = '{{ execution_date }}'
TODAY_MD = dt.today().strftime("%m%d")

def datetime_range(start, end, delta):
    """Generates the date range with time separation"""
    current = start
    if not isinstance(delta, td):
            delta = td(**delta)
    while current < end:
        yield current
        current += delta

default_args = {
        'owner': 'test',
        'depends_on_past': False,
        'start_date': START_DT, 
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'queue': 'etl',
        'retries': 1,
        'retry_delay': td(minutes=1),
}

dag_name = 'SEQ_TEST_01'

dag = DAG(dag_id=dag_name, default_args=default_args, schedule_interval=td(minutes=30))

def seq_job(sq_dt, **kwargs):
    for count, dt_in in enumerate(datetime_range(START_DT, END_DT, {'minutes':30}), 1):
        if sq_dt < str(dt_in):
            curr_seq = count, dt_in, dt_in + td(minutes=29, seconds=59)
            sequence = int(curr_seq[0])
            return sequence

pycall = PythonOperator(
    task_id='seq_sensor',
    provide_context=True,
    python_callable=seq_job,
    op_kwargs={'sq_dt': CURRENT_EXEC},
    dag=dag)

def group(grp, **context):
    sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
    grp = '%0.2d' % grp
    database = 'TEST'
    today_date = '{{ ds_nodash }}'
    return BashOperator(
           task_id='ETL_GRP{}_{}_{}'.format(database, sequence, gap),
           bash_command='script.sh {} {} {} {}'.format(today_date, sequence, database, grp), 
           dag=dag)

complete = DummyOperator(
        task_id='All_Sequences_complete',
        dag=dag)

pycall >> group(1) >> complete
pycall >> group(2) >> complete
pycall >> group(3) >> complete

Issue is that no matter what i try, i keep getting this error:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 263, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 66, in <module>
    pycall >> group(1) >> complete
  File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 56, in group
    sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
KeyError: 'task_instance'

Not sure if it's something small I am missing, or if I have everything wrong. Still new to airflow and trying to setup our ETL test env to run every 30 minutes with unique sequence number which is generated by datetime_range and is based on on execution_date variable.

like image 622
Oleg Yamin Avatar asked Dec 21 '16 02:12

Oleg Yamin


People also ask

What is Task_instance in airflow?

Sets the current execution context to the provided context object. This method should be called once per Task execution, before calling operator. execute. airflow.models.taskinstance.

What is XCom in airflow?

XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines. An XCom is identified by a key (essentially its name), as well as the task_id and dag_id it came from.

How do I use airflow in XCom?

Getting started on Airflow Xcom | 6 Examples Airflow Push and pull same ID from several operator. Push and pull from other Airflow Operator than pythonOperator. Push return code from bash operator to XCom. Pull between different DAGS.


1 Answers

Try to use context['ti'] instead.

like image 50
Dmitri Safine Avatar answered Oct 10 '22 05:10

Dmitri Safine