Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow - Task Instance in EMR operator

In Airflow, I'm facing the issue that I need to pass the job_flow_id to one of my emr-steps. I am capable of retrieving the job_flow_id from the operator but when I am going to create the steps to submit to the cluster, the task_instance value is not right. I have the following code:

def issue_step(name, args):
    return [
        {
            "Name": name,
            "ActionOnFailure": "CONTINUE",
            "HadoopJarStep": {
                "Jar": "s3://....",
                "Args": args
            }
        }
    ]

dag = DAG('example',
          description='My dag',
          schedule_interval='0 8 * * 6',
          dagrun_timeout=timedelta(days=2))

try:

    create_emr = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        aws_conn_id='aws_default',        
        dag=dag
    )

    load_data_steps = issue_step('load', ['arg1', 'arg2'])

    load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
    load_data_steps[0]["HadoopJarStep"]["Args"].append(
        "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id

    load_data = EmrAddStepsOperator(
        task_id='load_data',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",  # this is correctly exchanged with the job_flow_id - same for the others
        aws_conn_id='aws_default',
        steps=load_data_steps,
        dag=dag
    )

    check_load_data = EmrStepSensor(
        task_id='watch_load_data',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
        aws_conn_id='aws_default',
        dag=dag
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        dag=dag
    )

    create_emr_recommendations >> load_data
    load_data >> check_load_data
    check_load_data >> cluster_remover

except AirflowException as ae:
    print ae.message

The problem is that, when I check the EMR, instead of seeing the --cluster-id j-1234 in the load_data step, I see --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}", which causes my step to fail.

How can I get the actual value inside my step function ?

Thanks and happy holidays

like image 912
spaghettifunk Avatar asked Dec 24 '17 09:12

spaghettifunk


People also ask

How to create an EMR cluster in Apache Airflow?

Apache Airflow has an EmrCreateJobFlowOperator operator to create an EMR cluster. We have to define the cluster configurations and the operator can use that to create the EMR cluster.

What is an airflow task instance?

Airflow Task Instances are defined as a representation for, “a specific run of a Task” and a categorization with a collection of, “a DAG, a task, and a point in time.” Each Airflow Task Instances have a follow-up loop that indicates which state is the Airflow Task Instance fall upon.

What is airflow to AWS EMR setup connection?

Setup Connection. Airflow to AWS EMR integration provides several operators to create and interact with EMR service. Two example_dags are provided which showcase these operators in action.

What is the difference between airflow and EMR?

Airflow is easy to install. EMR takes more steps, which is one reason why you might want to use Airflow. Beyond the initial setup, however, Amazon makes EMR cluster creation easier the second time you use it by saving a script that you can run with the Amazon command line interface (CLI).


1 Answers

I found out that there is PR on airflow repository about this. The issue is that there is no templating for steps in the EmrAddStepsOperator. To overcome this issue, I did the following:

  • Created a custom operator that inherits from EmrAddStepsOperator
  • Added this operator as Plugin
  • Called the newly operator in my DAG file

Here the code for the custom operator and the plugin in file custom_emr_add_step_operator.py (see tree below)

from __future__ import division, absolute_import, print_function

from airflow.plugins_manager import AirflowPlugin
from airflow.utils import apply_defaults

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator


class CustomEmrAddStepsOperator(EmrAddStepsOperator):
    template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above

    @apply_defaults
    def __init__(
            self,
            *args, **kwargs):
        super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        super(CustomEmrAddStepsOperator, self).execute(context=context)


# Defining the plugin class
class CustomPlugin(AirflowPlugin):
    name = "custom_plugin"
    operators = [CustomEmrAddStepsOperator]

In my DAG file I called the plugin in this way

from airflow.operators import CustomEmrAddStepsOperator

The structure of my project and plugins looks like this:

├── config
│   └── airflow.cfg
├── dags
│   ├── __init__.py
│   └── my_dag.py
├── plugins
│   ├── __init__.py
│   └── operators
│       ├── __init__.py
│       └── custom_emr_add_step_operator.py
└── requirements.txt

If you are using an IDE such as PyCharm, this will complain because it says that it cannot find the module. But when you run Airflow, this problem will not appear. Remember also to make sure that in your airflow.cfg you are going to point to the right plugins folder so that Airflow is able to read your newly created plugin.

like image 170
spaghettifunk Avatar answered Oct 21 '22 03:10

spaghettifunk