Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fusing operators together

Tags:

airflow

I'm still in the process of deploying Airflow and I've already felt the need to merge operators together. The most common use-case would be coupling an operator and the corresponding sensor. For instance, one might want to chain together the EmrStepOperator and EmrStepSensor.


I'm creating my DAGs programmatically, and the biggest one of those contains 150+ (identical) branches, each performing the same series of operations on different bits of data (tables). Therefore clubbing together tasks that make-up a single logical step in my DAG would be of great help.

Here are 2 contending examples from my project to give motivation for my argument.

1. Deleting data from S3 path and then writing new data

This step comprises 2 operators

  • DeleteS3PathOperator: Extends from BaseOperator & uses S3Hook
  • HadoopDistcpOperator: Extends from SSHOperator

2. Conditionally performing MSCK REPAIR on Hive table

This step contains 4 operators

  • BranchPythonOperator: Checks whether Hive table is partitioned
  • MsckRepairOperator: Extends from HiveOperator and performs MSCK REPAIR on (partioned) table
  • Dummy(Branch)Operator: Makes up alternate branching path to MsckRepairOperator (for non-partitioned tables)
  • Dummy(Join)Operator: Makes up the join step for both branches

Using operators in isolation certainly offers smaller modules and more fine-grained logging / debugging, but in large DAGs, reducing the clutter might be desirable. From my current understanding there are 2 ways to chain operators together

  1. Hooks

    Write actual processing logic in hooks and then use as many hooks as you want within a single operator (Certainly the better way in my opinion)

  2. SubDagOperator

    A risky and controversial way of doing things; additionally the naming convention for SubDagOperator makes me frown.


My questions are

  • Should operators be composed at all or is it better to have discrete steps?
  • Any pitfalls, improvements in above approaches?
  • Any other ways to combine operators together?
  • In taxonomy of Airflow, is the primary motive of Hooks same as above, or do they serve some other purposes too?

UPDATE-1

3. Multiple Inhteritance

While this is a Python feature rather than Airflow specific, its worthwhile to point out that multiple inheritance can come handy in combining functionalities of operators. QuboleCheckOperator, for instance, is already written using that. However in the past, I've tried this thing to fuse EmrCreateJobFlowOperator and EmrJobFlowSensor, but at the time I had run into issues with @apply_defaults decorator and had abandoned the idea.

like image 399
y2k-shubham Avatar asked Nov 14 '18 20:11

y2k-shubham


2 Answers

I have combined various hooks to create a Single operator based on my needs. A simple example is I clubbed gcs delete, copy, list method and get_size methods in hook to create a single operator called GcsDataValidationOperator. A rule of thumb would be to have Idempotency i.e. if you run multiple times it should produce the same result.

Should operators be composed at all or is it better to have discrete steps?

The only pitfall is maintainability, sometimes when the hooks change in the master branch, you will need to update all your operator manually if there are any breaking changes.

Any pitfalls, improvements in above approaches?

You can use PythonOperator and use the in-built hooks with .execute method, but it would still mean a lot of details in the DAG file. Hence, I would still go for a new operator approach

Any other ways to combine operators together?

Hooks are just interfaces to external platforms and databases like Hive, GCS, etc and form building blocks for operators. This allows the creation of new operators. Also, this mean you can customize templated field, add slack notification on each granular step inside your new operator and have your own logging details.

In taxonomy of Airflow, is the primary motive of Hooks same as above, or do they serve some other purposes too?

FWIW: I am the PMC member and a contributor of the Airflow project.

like image 135
kaxil Avatar answered Dec 25 '22 22:12

kaxil


This is not a generic answer to the question (although I think the approach is generic enough that it can be extended to other operators and sensors). It just shows how EmrCreateJobFlowOperator and EmrJobFlowSensor can be fused together using Python's multiple inheritance.

Code for Fused Operator and Sensor

# Necessary imports

class EmrStartCluster(EmrCreateJobFlowOperator, EmrJobFlowSensor):
    ui_color = "#c70039"
    NON_TERMINAL_STATES = ["STARTING", "BOOTSTRAPPING"]

    @apply_defaults
    def __init__(self, *args, **kwargs):
        kwargs["job_flow_id"] = None  # We do this, because EmrJobFlowSensor requires `job_flow_id` to be initialized
        super(EmrStartCluster, self).__init__(*args, **kwargs)

    def execute(self, context):
        self.job_flow_id = super(EmrStartCluster, self).execute(context)
        super(EmrJobFlowSensor, self).execute(context)
        return self.job_flow_id

The fused operator can be invoked like:

JOB_FLOW_OVERRIDES = {} ## Job flow config goes here
dag = DAG() # Dag invocation goes here

cluster_creator = EmrStartCluster(
    dag=dag,
    task_id='start_cluster',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default'
)

I have tested it with EmrStepSensor, EmrAddStepsOperator, and EmrTerminateJobFlowOperator and so far have had no issues.

like image 31
Amit Singh Avatar answered Dec 26 '22 00:12

Amit Singh