I am using Airflow version of 1.9.2 with Python 2.7 in Ubuntu. I tried to inherit from ParentOperator
class which works fine itself and to create a class called ChildOperator
. But when I create a ChildOperator
instance, I think some keyword arguments are missing or messed up here and I am getting this error:
airflow.exceptions.AirflowException: Use keyword arguments when initializing operators
Here is a simplified example:
class ParentOperator(BaseOperator, SkipMixin):
@apply_defaults
def __init__(self,
conn_id,
object,
args={},
s3_conn_id=None,
s3_key=None,
s3_bucket=None,
fields=None,
*args,
**kwargs
):
super(ParentOperator, self).__init__(*args, **kwargs)
...
class ChildOperator(ParentOperator):
@apply_defaults
def __init__(self,
conn_id,
object,
args={},
s3_conn_id=None,
s3_key=None,
s3_bucket=None,
fields=None,
*args,
**kwargs
):
args=...
super(ChildOperator, self).__init__(
conn_id,
object,
args=args,
s3_conn_id=s3_conn_id,
s3_key=s3_key,
s3_bucket=s3_bucket,
fields=fields,
*args,
**kwargs
)
...
myobjc = ChildOperator(
conn_id="my_default",
object=table,
args={},
s3_conn_id='s3_postgres_dump',
s3_key=s3_key,
s3_bucket=s3_bucket,
dag=dag,
task_id="task1"
)
Any idea what is causing this error? Is this more of a Python specific issue?
__init__
function of ChildOperator
needs to have all keyword parameters like the following (for the first two parameters of conn_id
and object
):
super(ChildOperator, self).__init__(
conn_id=conn_id,
object=object,
args=args,
s3_conn_id=s3_conn_id,
s3_key=s3_key,
s3_bucket=s3_bucket,
fields=fields,
*args,
**kwargs
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With