Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow error importing DAG using plugin - Relationships can only be set between Operators

I have written an airflow plugin that simply contains one custom operator (to support CMEK in BigQuery). I can create a simple DAG with a single task that uses this operator and that executes fine.

However if I try and create a dependency in the DAG from a DummyOperator task to my custom operator task the DAG fails to load in the UI and throws the following error and I can't understand why this error is being thrown?

Broken DAG: [/home/airflow/gcs/dags/js_bq_custom_plugin_v2.py] Relationships can only be set between Operators; received BQCMEKOperator

I have tested this so far on composer-1.4.2-airflow-1.9.0, composer-1.4.2-airflow-1.10.0 and composer-1.4.1-airflow-1.10.0.

Running airflow test for each of the tasks completes without error.

Using it in isolation in a DAG works fine (as below) so I don't believe there is anything inherently wrong with the plugin

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator


default_dag_args = {
    'start_date': datetime.datetime(2019,1,1),
    'retries': 0
}


dag = DAG(
    'js_bq_custom_plugin',
    schedule_interval=None,
    catchup=False,
    concurrency=1,
    max_active_runs=1,
    default_args=default_dag_args)

run_this = BQCMEKOperator(
    task_id     = 'cmek_plugin_test',
    sql         = 'select * from ds.foo LIMIT 15',
    project     = 'xxx',
    dataset     = 'js_dev',
    table       = 'cmek_test10',
    key         = 'xxx',
    dag     = dag
)

Whereas if I introduce a DummyOperator and dependency then the error occurs

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
from airflow.operators.dummy_operator import DummyOperator

default_dag_args = {
    'start_date': datetime.datetime(2019,1,1),
    'retries': 0
}

dag = DAG(
    'js_bq_custom_plugin_v2',
    schedule_interval=None,
    catchup=False,
    concurrency=1,
    max_active_runs=1,
    default_args=default_dag_args)

etl_start = DummyOperator(task_id='etl_start', dag=dag)

extract = BQCMEKOperator(
    task_id     = 'extract',
    sql         = 'select * from foo.bar LIMIT 15',
    project     = 'xxx',
    dataset     = 'js_dev',
    table       = 'cmek_test5',
    key         = 'xxx',
    dag         = dag
)

etl_start.set_downstream(extract)

The operator itself is straightforward and I can reproduce the issue with the simplest custom operator such as the one below

import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class TestOperator(BaseOperator):

    @apply_defaults
    def __init__(self,
                *args,
                **kwargs):
        super(TestOperator, self).__init__(*args, **kwargs)


    def execute(self, context):
        logging.info("Executed by TestOperator")

With the following plugin definition in init.py

from airflow.plugins_manager import AirflowPlugin
from test_plugin.operators.test_operator import TestOperator

class TestPlugin(AirflowPlugin):
    name = "test_plugin"
    operators = [TestOperator]
    hooks = []
    executors = []
    macros = []
    admin_views = []
    flask_blueprints = []
    menu_links = []

Also having looked at the airflow code in models.py that generates this error it uses isinstance(t, BaseOperator) and this returns true for my task using my custom operator when I just run it in python so I have no idea what is going on?

for t in task_list:
    if not isinstance(t, BaseOperator):
        raise AirflowException(
            "Relationships can only be set between "
            "Operators; received {}".format(t.__class__.__name__))
like image 479
Jasper Smith Avatar asked Jan 23 '19 21:01

Jasper Smith


1 Answers

There was a bug introduced in composer-1.4.2 release which we have fixed by now, try create a new Composer environment and that DAG error should go away. Meanwhile, we'll also apply that fix automatically on existing 1.4.2 environments over the next few days.

like image 75
Feng Lu Avatar answered Oct 22 '22 05:10

Feng Lu