I have the following DAG with two SSHExecuteOperator tasks. The first task executes a stored procedure which returns a parameter. The second task needs this parameter as an input.
Could please explain how to pull the value from the XCom pushed in task1, in order to use it in task2?
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.models import Variable
default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime.now(),
  'email': ['[email protected]'],
  'email_on_failure': True,
  'retries': 0
}
#server must be changed to point to the correct environment, to do so update DataQualitySSHHook variable in Airflow admin
DataQualitySSHHook = Variable.get('DataQualitySSHHook')
print('Connecting to: ' + DataQualitySSHHook)
sshHookEtl = SSHHook(conn_id=DataQualitySSHHook)
sshHookEtl.no_host_key_check = True 
#create dag
dag = DAG(
  'ed_data_quality_test-v0.0.3', #update version whenever you change something
  default_args=default_args,
  schedule_interval="0 0 * * *",
  dagrun_timeout=timedelta(hours=24),
  max_active_runs=1)
#create tasks
task1 = SSHExecuteOperator(
  task_id='run_remote_sp_audit_batch_register',
  bash_command="bash /opt/scripts/data_quality/EXEC_SP_AUDIT_BATCH.sh 'ED_DATA_QUALITY_MANUAL' 'REGISTER' '1900-01-01 00:00:00.000000' '2999-12-31 00:00:00.000000' ", #keep the space at the end
  ssh_hook=sshHookEtl,
  xcom_push=True,
  retries=0,
  dag=dag)
task2 = SSHExecuteOperator(
  task_id='run_remote_sp_audit_module_session_start',
  bash_command="echo {{ ti.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}",
  ssh_hook=sshHookEtl,
  retries=0,
  dag=dag)
#create dependencies
task1.set_downstream(task2)
that is stored IN the metadata database of Airflow. From left to right, The key is the identifier of your XCom. No need to be unique and is used to get back the xcom from a given task.
XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines. An XCom is identified by a key (essentially its name), as well as the task_id and dag_id it came from.
So the solution I have found is when task1 executes the shell script, you have to make sure the parameter you want to be captured by the XCom variable is the last thing printed by your script (using echo).
Then I was able to retrieve the XCom variable value with the following code snippet:
{{ task_instance.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}
insted of xcom_push=True , try do_xcom_push=True, It will bring all the stdout to the xcom with key return_value
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With