I'm using Airflow 1.8.1 and I want to push the result of a sql request from PostgreOperator.
Here's my tasks:
check_task = PostgresOperator(
task_id='check_task',
postgres_conn_id='conx',
sql="check_task.sql",
xcom_push=True,
dag=dag)
def py_is_first_execution(**kwargs):
value = kwargs['ti'].xcom_pull(task_ids='check_task')
print 'count ----> ', value
if value == 0:
return 'next_task'
else:
return 'end-flow'
check_branch = BranchPythonOperator(
task_id='is-first-execution',
python_callable=py_is_first_execution,
provide_context=True,
dag=dag)
and here is my sql script:
select count(1) from table
when i check the xcom value from check_task
it retrieves none
value.
XComs are explicitly “pushed” and “pulled” to/from their storage using the xcom_push and xcom_pull methods on Task Instances. Many operators will auto-push their results into an XCom key called return_value if the do_xcom_push argument is set to True (as it is by default), and @task functions do this as well.
Pulling a XCom with xcom_pull In order to pull a XCom from a task, you have to use the xcom_pull method. Like xcom_push, this method is available through a task instance object. xcom_pull expects 2 arguments: task_ids, only XComs from tasks matching ids will be pulled.
Like it is mentioned in the docs, if you are happy with the risk and know that no one is going to infiltrate your environment, you can set enable_xcom_pickling = True to use Pickling to store XComs instead of JSON. And then you won't get that error.
If i'm correct, airflow automatically pushes to xcom when a query returns a value. However, when you look at the code of the postgresoperator you see that it has an execute method that calls the run method of the PostgresHook (extension of dbapi_hook). Both methods do not return anything, as such it pushes nothing to xcom. What we did to fix this is create a CustomPostgresSelectOperator, a copy of the PostgresOperator, but instead of 'hook.run(..)' do 'return hook.get_records(..)'.
Hope that helps you.
Finally, I created a new Sensor ExecuteSqlOperator
in the plugin manager under $AIRFLOW_HOME/plugins
.
I used CheckOperator
as an example and I modified the returned value: the basic running of this operator was exactly the reverse of what I needed.
Here's the of the default ExecuteSqlOperator
:
CheckOperator
and here is my customized SqlSensor
: ReverseSqlSensor
class SqlExecuteOperator(BaseOperator):
"""
Performs checks against a db. The ``CheckOperator`` expects
a sql query that will return a single row.
Note that this is an abstract class and get_db_hook
needs to be defined. Whereas a get_db_hook is hook that gets a
single record from an external source.
:param sql: the sql to be executed
:type sql: string
"""
template_fields = ('sql',)
template_ext = ('.hql', '.sql',)
ui_color = '#fff7e6'
@apply_defaults
def __init__(
self, sql,
conn_id=None,
*args, **kwargs):
super(SqlExecuteOperator, self).__init__(*args, **kwargs)
self.conn_id = conn_id
self.sql = sql
def execute(self, context=None):
logging.info('Executing SQL statement: ' + self.sql)
records = self.get_db_hook().get_first(self.sql)
logging.info("Record: " + str(records))
records_int = int(records[0])
print (records_int)
return records_int
def get_db_hook(self):
return BaseHook.get_hook(conn_id=self.conn_id)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With