Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass the PostgreSQL query result into a variable in Airflow? (Postgres Operator or Postgres Hook)

I'm planning to use PostgreSQL as my task meta info provider, so I want to run a few queries and get some data and pass it like a filled variable to another task. The problem is when I use PostgresHook I get the data but its in a python method that I cant access, in fact I see bellow line

[2021-08-23 13:00:12,628] {python.py:151} INFO - Done. Returned value was: [[1, "inf_account",....]]

here is part of my code:

def _query_postgres(**context):
    """
    Queries Postgres and returns a cursor to the results.
    """

    postgres = PostgresHook(postgres_conn_id="aramis_postgres_connection")
    conn = postgres.get_conn()
    cursor = conn.cursor()
    mark_williams = cursor.execute(" SELECT * FROM public.aramis_meta_task; ")

    # iterate over to get a list of dicts
    details_dicts = [doc for doc in cursor]

    # serialize to json string
    details_json_string = json.dumps(details_dicts, default=json_util.default)

    task_instance = context['task_instance']
    task_instance.xcom_push(key="my_value", value=details_json_string)
    return details_json_string

but I don't know which variable should I use to access it or how to push it to XCOM so that i can use that returned value as a parameter to another bashoperator task(Spark for example).

PostgresOperator on the other hand, only returns None as result.

like image 528
Aramis NSR Avatar asked Sep 20 '25 07:09

Aramis NSR


1 Answers

The PostgresOperator does not return any values, so unfortunately you can't use that to pass around data. You'll have to implement your own operator, for which you can indeed use the PostgresHook.

There are a few things to note about your code:

  1. The "Returned value was" log is output from the PythonOperator?
  2. You can "explicitly" push to XCom which you showed with xcom_push(), but returning a value also automatically pushes to XCom, so you'll have your output stored in XCom twice.
  3. You can "pull" an XCom value using xcom_pull(), more details here: https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
  4. You can fetch all output from the Postgres cursor using cursor.fetchall(), I have a similar example here (it writes output to local disk): https://github.com/godatadriven/airflow-testing-examples/blob/master/src/testing_examples/operators/postgres_to_local_operator.py#L35-L41
  5. Be careful with large data & XComs. By default, XComs are stored in the Airflow metastore, and you don't want to store data in there that's too big. Alternatively, you can configure a custom XCom backend which allows you to use e.g. AWS S3 for XCom storage: https://www.astronomer.io/guides/custom-xcom-backends
like image 159
Bas Harenslak Avatar answered Sep 22 '25 21:09

Bas Harenslak