I am trying to pass a Python function in Airflow. I am not sure what the key and values should be for a xcom_push function. Could anyone assist on this. Thanks
def db_log(**context):
db_con = psycopg2.connect(" dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439' sslmode = 'require' ")
task_instance = context['task_instance']
task_instance.xcom_push(key=db_con, value = db_log)
return (db_con)
Could anyone assist in getting the correct key and value for the xcom_push function. Thanks..
XComs are explicitly “pushed” and “pulled” to/from their storage using the xcom_push and xcom_pull methods on Task Instances. Many operators will auto-push their results into an XCom key called return_value if the do_xcom_push argument is set to True (as it is by default), and @task functions do this as well.
XCom push/pull just adds/retrieves a row from the xcom table in the airflow DB based on DAG id, execution date, task id, and key.
In examples the correct way of calling can be found, e.g.: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py
So here it should be
task_instance.xcom_push(key=<string identifier>, value=<actual value / object>)
In your case
task_instance.xcom_push(key="db_con", value=db_con)
This is a bit old, but from what I understand, if you are running db_log as a task, then returning db_con would automatically push it to the xcom.
You could then access it with {{ti.xcom_pull(task_ids='TASK_NAME_HERE')}}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With