Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow - Defining the key,value for a xcom_push function

Tags:

airflow

I am trying to pass a Python function in Airflow. I am not sure what the key and values should be for a xcom_push function. Could anyone assist on this. Thanks

def db_log(**context):
  db_con = psycopg2.connect(" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439' sslmode = 'require' ")
  task_instance = context['task_instance']
  task_instance.xcom_push(key=db_con, value = db_log)
  return (db_con)

Could anyone assist in getting the correct key and value for the xcom_push function. Thanks..

like image 941
dark horse Avatar asked May 30 '18 09:05

dark horse


People also ask

What is Xcom_push?

XComs are explicitly “pushed” and “pulled” to/from their storage using the xcom_push and xcom_pull methods on Task Instances. Many operators will auto-push their results into an XCom key called return_value if the do_xcom_push argument is set to True (as it is by default), and @task functions do this as well.

What is Xcom_pull in XCOM Airflow?

XCom push/pull just adds/retrieves a row from the xcom table in the airflow DB based on DAG id, execution date, task id, and key.


2 Answers

In examples the correct way of calling can be found, e.g.: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py

So here it should be

task_instance.xcom_push(key=<string identifier>, value=<actual value / object>)

In your case

task_instance.xcom_push(key="db_con", value=db_con)
like image 110
tobi6 Avatar answered Oct 10 '22 10:10

tobi6


This is a bit old, but from what I understand, if you are running db_log as a task, then returning db_con would automatically push it to the xcom.

You could then access it with {{ti.xcom_pull(task_ids='TASK_NAME_HERE')}}

like image 43
Alex oladele Avatar answered Oct 10 '22 08:10

Alex oladele