Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to retrieve a value from Airflow XCom pushed via SSHExecuteOperator

Tags:

ssh

airflow

I have the following DAG with two SSHExecuteOperator tasks. The first task executes a stored procedure which returns a parameter. The second task needs this parameter as an input.

Could please explain how to pull the value from the XCom pushed in task1, in order to use it in task2?

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.models import Variable

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime.now(),
  'email': ['[email protected]'],
  'email_on_failure': True,
  'retries': 0
}

#server must be changed to point to the correct environment, to do so update DataQualitySSHHook variable in Airflow admin
DataQualitySSHHook = Variable.get('DataQualitySSHHook')
print('Connecting to: ' + DataQualitySSHHook)
sshHookEtl = SSHHook(conn_id=DataQualitySSHHook)
sshHookEtl.no_host_key_check = True 

#create dag
dag = DAG(
  'ed_data_quality_test-v0.0.3', #update version whenever you change something
  default_args=default_args,
  schedule_interval="0 0 * * *",
  dagrun_timeout=timedelta(hours=24),
  max_active_runs=1)

#create tasks
task1 = SSHExecuteOperator(
  task_id='run_remote_sp_audit_batch_register',
  bash_command="bash /opt/scripts/data_quality/EXEC_SP_AUDIT_BATCH.sh 'ED_DATA_QUALITY_MANUAL' 'REGISTER' '1900-01-01 00:00:00.000000' '2999-12-31 00:00:00.000000' ", #keep the space at the end
  ssh_hook=sshHookEtl,
  xcom_push=True,
  retries=0,
  dag=dag)

task2 = SSHExecuteOperator(
  task_id='run_remote_sp_audit_module_session_start',
  bash_command="echo {{ ti.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}",
  ssh_hook=sshHookEtl,
  retries=0,
  dag=dag)

#create dependencies
task1.set_downstream(task2)
like image 987
Alexis.Rolland Avatar asked Mar 23 '17 10:03

Alexis.Rolland


People also ask

Where is XCom stored Airflow?

that is stored IN the metadata database of Airflow. From left to right, The key is the identifier of your XCom. No need to be unique and is used to get back the xcom from a given task.

How does XCom work in Airflow?

XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines. An XCom is identified by a key (essentially its name), as well as the task_id and dag_id it came from.


2 Answers

So the solution I have found is when task1 executes the shell script, you have to make sure the parameter you want to be captured by the XCom variable is the last thing printed by your script (using echo).

Then I was able to retrieve the XCom variable value with the following code snippet:

{{ task_instance.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}

like image 59
Alexis.Rolland Avatar answered Nov 15 '22 14:11

Alexis.Rolland


insted of xcom_push=True , try do_xcom_push=True, It will bring all the stdout to the xcom with key return_value

like image 31
Milan Avatar answered Nov 15 '22 13:11

Milan