In Airflow, I'm facing the issue that I need to pass the job_flow_id
to one of my emr-steps. I am capable of retrieving the job_flow_id
from the operator but when I am going to create the steps to submit to the cluster, the task_instance
value is not right.
I have the following code:
def issue_step(name, args):
return [
{
"Name": name,
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "s3://....",
"Args": args
}
}
]
dag = DAG('example',
description='My dag',
schedule_interval='0 8 * * 6',
dagrun_timeout=timedelta(days=2))
try:
create_emr = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
dag=dag
)
load_data_steps = issue_step('load', ['arg1', 'arg2'])
load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
load_data_steps[0]["HadoopJarStep"]["Args"].append(
"{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id
load_data = EmrAddStepsOperator(
task_id='load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others
aws_conn_id='aws_default',
steps=load_data_steps,
dag=dag
)
check_load_data = EmrStepSensor(
task_id='watch_load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
create_emr_recommendations >> load_data
load_data >> check_load_data
check_load_data >> cluster_remover
except AirflowException as ae:
print ae.message
The problem is that, when I check the EMR, instead of seeing the --cluster-id j-1234
in the load_data
step, I see --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}"
, which causes my step to fail.
How can I get the actual value inside my step function ?
Thanks and happy holidays
Apache Airflow has an EmrCreateJobFlowOperator operator to create an EMR cluster. We have to define the cluster configurations and the operator can use that to create the EMR cluster.
Airflow Task Instances are defined as a representation for, “a specific run of a Task” and a categorization with a collection of, “a DAG, a task, and a point in time.” Each Airflow Task Instances have a follow-up loop that indicates which state is the Airflow Task Instance fall upon.
Setup Connection. Airflow to AWS EMR integration provides several operators to create and interact with EMR service. Two example_dags are provided which showcase these operators in action.
Airflow is easy to install. EMR takes more steps, which is one reason why you might want to use Airflow. Beyond the initial setup, however, Amazon makes EMR cluster creation easier the second time you use it by saving a script that you can run with the Amazon command line interface (CLI).
I found out that there is PR on airflow repository about this. The issue is that there is no templating for steps in the EmrAddStepsOperator
. To overcome this issue, I did the following:
EmrAddStepsOperator
Here the code for the custom operator and the plugin in file custom_emr_add_step_operator.py
(see tree below)
from __future__ import division, absolute_import, print_function
from airflow.plugins_manager import AirflowPlugin
from airflow.utils import apply_defaults
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
class CustomEmrAddStepsOperator(EmrAddStepsOperator):
template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above
@apply_defaults
def __init__(
self,
*args, **kwargs):
super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs)
def execute(self, context):
super(CustomEmrAddStepsOperator, self).execute(context=context)
# Defining the plugin class
class CustomPlugin(AirflowPlugin):
name = "custom_plugin"
operators = [CustomEmrAddStepsOperator]
In my DAG file I called the plugin in this way
from airflow.operators import CustomEmrAddStepsOperator
The structure of my project and plugins looks like this:
├── config
│ └── airflow.cfg
├── dags
│ ├── __init__.py
│ └── my_dag.py
├── plugins
│ ├── __init__.py
│ └── operators
│ ├── __init__.py
│ └── custom_emr_add_step_operator.py
└── requirements.txt
If you are using an IDE such as PyCharm, this will complain because it says that it cannot find the module. But when you run Airflow, this problem will not appear.
Remember also to make sure that in your airflow.cfg
you are going to point to the right plugins
folder so that Airflow is able to read your newly created plugin.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With