Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: How to push xcom value from PostgreOperator?

I'm using Airflow 1.8.1 and I want to push the result of a sql request from PostgreOperator.

Here's my tasks:

check_task = PostgresOperator(
    task_id='check_task',
    postgres_conn_id='conx',
    sql="check_task.sql",
    xcom_push=True,
    dag=dag)

def py_is_first_execution(**kwargs):
    value = kwargs['ti'].xcom_pull(task_ids='check_task')
    print 'count ----> ', value
    if value == 0:
       return 'next_task'
    else:
       return 'end-flow'

check_branch = BranchPythonOperator(
    task_id='is-first-execution',
    python_callable=py_is_first_execution,
    provide_context=True,
    dag=dag)

and here is my sql script:

select count(1) from table

when i check the xcom value from check_task it retrieves none value.

like image 938
Omar14 Avatar asked Aug 11 '17 08:08

Omar14


People also ask

How do I push in airflow XCom?

XComs are explicitly “pushed” and “pulled” to/from their storage using the xcom_push and xcom_pull methods on Task Instances. Many operators will auto-push their results into an XCom key called return_value if the do_xcom_push argument is set to True (as it is by default), and @task functions do this as well.

How do you pull data from XCom?

Pulling a XCom with xcom_pull In order to pull a XCom from a task, you have to use the xcom_pull method. Like xcom_push, this method is available through a task instance object. xcom_pull expects 2 arguments: task_ids, only XComs from tasks matching ids will be pulled.

How do I activate pickles XCom?

Like it is mentioned in the docs, if you are happy with the risk and know that no one is going to infiltrate your environment, you can set enable_xcom_pickling = True to use Pickling to store XComs instead of JSON. And then you won't get that error.


2 Answers

If i'm correct, airflow automatically pushes to xcom when a query returns a value. However, when you look at the code of the postgresoperator you see that it has an execute method that calls the run method of the PostgresHook (extension of dbapi_hook). Both methods do not return anything, as such it pushes nothing to xcom. What we did to fix this is create a CustomPostgresSelectOperator, a copy of the PostgresOperator, but instead of 'hook.run(..)' do 'return hook.get_records(..)'.

Hope that helps you.

like image 174
J.Brouwers Avatar answered Sep 22 '22 09:09

J.Brouwers


Finally, I created a new Sensor ExecuteSqlOperator in the plugin manager under $AIRFLOW_HOME/plugins.

I used CheckOperator as an example and I modified the returned value: the basic running of this operator was exactly the reverse of what I needed.

Here's the of the default ExecuteSqlOperator: CheckOperator

and here is my customized SqlSensor: ReverseSqlSensor

class SqlExecuteOperator(BaseOperator):
    """
    Performs checks against a db. The ``CheckOperator`` expects
    a sql query that will return a single row.

    Note that this is an abstract class and get_db_hook
    needs to be defined. Whereas a get_db_hook is hook that gets a
    single record from an external source.
    :param sql: the sql to be executed
    :type sql: string
    """

    template_fields = ('sql',)
    template_ext = ('.hql', '.sql',)
    ui_color = '#fff7e6'

    @apply_defaults
    def __init__(
            self, sql,
            conn_id=None,
            *args, **kwargs):
        super(SqlExecuteOperator, self).__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.sql = sql

    def execute(self, context=None):
        logging.info('Executing SQL statement: ' + self.sql)
        records = self.get_db_hook().get_first(self.sql)
        logging.info("Record: " + str(records))
        records_int = int(records[0])
        print (records_int)
        return records_int

    def get_db_hook(self):
        return BaseHook.get_hook(conn_id=self.conn_id)
like image 40
Omar14 Avatar answered Sep 22 '22 09:09

Omar14