Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

airflow.exceptions.AirflowException: Use keyword arguments when initializing operators

Tags:

airflow

I am using Airflow version of 1.9.2 with Python 2.7 in Ubuntu. I tried to inherit from ParentOperator class which works fine itself and to create a class called ChildOperator. But when I create a ChildOperator instance, I think some keyword arguments are missing or messed up here and I am getting this error:

airflow.exceptions.AirflowException: Use keyword arguments when initializing operators

Here is a simplified example:

class ParentOperator(BaseOperator, SkipMixin):

    @apply_defaults
    def __init__(self,
             conn_id,
             object,
             args={},
             s3_conn_id=None,
             s3_key=None,
             s3_bucket=None,
             fields=None,
             *args,
             **kwargs
             ):

        super(ParentOperator, self).__init__(*args, **kwargs)
        ...


class ChildOperator(ParentOperator):

    @apply_defaults
    def __init__(self,
             conn_id,
             object,
             args={},
             s3_conn_id=None,
             s3_key=None,
             s3_bucket=None,
             fields=None,
             *args,
             **kwargs
             ):

        args=...
        super(ChildOperator, self).__init__(
            conn_id,
            object,
            args=args,
            s3_conn_id=s3_conn_id,
            s3_key=s3_key,
            s3_bucket=s3_bucket,
            fields=fields,
            *args,
            **kwargs
        )

...

myobjc = ChildOperator(
    conn_id="my_default",
    object=table,
    args={},
    s3_conn_id='s3_postgres_dump',
    s3_key=s3_key,
    s3_bucket=s3_bucket,
    dag=dag,
    task_id="task1"
)

Any idea what is causing this error? Is this more of a Python specific issue?

like image 865
kee Avatar asked Aug 03 '18 22:08

kee


1 Answers

__init__ function of ChildOperator needs to have all keyword parameters like the following (for the first two parameters of conn_id and object):

super(ChildOperator, self).__init__(
        conn_id=conn_id,
        object=object,
        args=args,
        s3_conn_id=s3_conn_id,
        s3_key=s3_key,
        s3_bucket=s3_bucket,
        fields=fields,
        *args,
        **kwargs
)
like image 98
kee Avatar answered Nov 17 '22 00:11

kee