I'm still in the process of deploying Airflow
and I've already felt the need to merge operator
s together. The most common use-case would be coupling an operator and the corresponding sensor
. For instance, one might want to chain together the EmrStepOperator
and EmrStepSensor
.
I'm creating my DAG
s programmatically, and the biggest one of those contains 150+ (identical) branches, each performing the same series of operations on different bits of data (tables). Therefore clubbing together tasks that make-up a single logical step in my DAG would be of great help.
Here are 2 contending examples from my project to give motivation for my argument.
1. Deleting data from S3 path and then writing new data
This step comprises 2 operators
DeleteS3PathOperator
: Extends from BaseOperator
& uses S3Hook
HadoopDistcpOperator
: Extends from SSHOperator
2. Conditionally performing MSCK REPAIR
on Hive
table
This step contains 4 operators
BranchPythonOperator
: Checks whether Hive table is partitioned
MsckRepairOperator
: Extends from HiveOperator
and performs MSCK REPAIR on (partioned) tableDummy(Branch)Operator
: Makes up alternate branching path to MsckRepairOperator
(for non-partitioned tables)Dummy(Join)Operator
: Makes up the join step for both branchesUsing operators in isolation certainly offers smaller modules and more fine-grained logging / debugging, but in large DAGs, reducing the clutter might be desirable. From my current understanding there are 2 ways to chain operators together
Hook
s
Write actual processing logic in hooks and then use as many hooks as you want within a single operator (Certainly the better way in my opinion)
SubDagOperator
A risky and controversial way of doing things; additionally the naming convention for SubDagOperator makes me frown.
My questions are
UPDATE-1
3. Multiple Inhteritance
While this is a Python
feature rather than Airflow
specific, its worthwhile to point out that multiple inheritance can come handy in combining functionalities of operators. QuboleCheckOperator
, for instance, is already written using that. However in the past, I've tried this thing to fuse EmrCreateJobFlowOperator
and EmrJobFlowSensor
, but at the time I had run into issues with @apply_defaults
decorator and had abandoned the idea.
I have combined various hooks to create a Single operator based on my needs. A simple example is I clubbed gcs delete, copy, list method and get_size methods in hook to create a single operator called GcsDataValidationOperator
. A rule of thumb would be to have Idempotency i.e. if you run multiple times it should produce the same result.
Should operators be composed at all or is it better to have discrete steps?
The only pitfall is maintainability, sometimes when the hooks change in the master branch, you will need to update all your operator manually if there are any breaking changes.
Any pitfalls, improvements in above approaches?
You can use PythonOperator
and use the in-built hooks with .execute
method, but it would still mean a lot of details in the DAG file. Hence, I would still go for a new operator approach
Any other ways to combine operators together?
Hooks are just interfaces to external platforms and databases like Hive, GCS, etc and form building blocks for operators. This allows the creation of new operators. Also, this mean you can customize templated field, add slack notification on each granular step inside your new operator and have your own logging details.
In taxonomy of Airflow, is the primary motive of Hooks same as above, or do they serve some other purposes too?
FWIW: I am the PMC member and a contributor of the Airflow project.
This is not a generic answer to the question (although I think the approach is generic enough that it can be extended to other operators and sensors). It just shows how EmrCreateJobFlowOperator
and EmrJobFlowSensor
can be fused together using Python's multiple inheritance.
Code for Fused Operator and Sensor
# Necessary imports
class EmrStartCluster(EmrCreateJobFlowOperator, EmrJobFlowSensor):
ui_color = "#c70039"
NON_TERMINAL_STATES = ["STARTING", "BOOTSTRAPPING"]
@apply_defaults
def __init__(self, *args, **kwargs):
kwargs["job_flow_id"] = None # We do this, because EmrJobFlowSensor requires `job_flow_id` to be initialized
super(EmrStartCluster, self).__init__(*args, **kwargs)
def execute(self, context):
self.job_flow_id = super(EmrStartCluster, self).execute(context)
super(EmrJobFlowSensor, self).execute(context)
return self.job_flow_id
The fused operator can be invoked like:
JOB_FLOW_OVERRIDES = {} ## Job flow config goes here
dag = DAG() # Dag invocation goes here
cluster_creator = EmrStartCluster(
dag=dag,
task_id='start_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default'
)
I have tested it with EmrStepSensor
, EmrAddStepsOperator
, and EmrTerminateJobFlowOperator
and so far have had no issues.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With