def mysql_operator_test():
DEFAULT_DATE = datetime(2017, 10, 9)
t = MySqlOperator(
task_id='basic_mysql',
sql="SELECT count(*) from table 1 where id>100;",
mysql_conn_id='mysql_default',
dag=dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=False)
run_this = PythonOperator(
task_id='getRecoReq',
python_callable=mysql_operator_test,
# xcom_push=True,
dag=dag)
task2 = PythonOperator(
task_id= 'mysql_select',
provide_context=True,
python_callable = blah,
templates_dict = {'requests': "{{ ti.xcom_pull(task_ids='getReq') }}" },
dag=dag)
run_this.set_downstream(task2)
I want to capture the count returned by the MySqlOperator using xcoms. Can someone please guide regarding the same?
You're very close! However, the way you're asking this question is kind of an anti-pattern. You don't want to share data across tasks in Airflow. Also, you don't want to use the operator like you are in mysql_operator_test
. It's tempting, I did the same thing when I was getting started.
I tried something very similar to this but with SFTP connections. I eventually just did everything inside of the PythonOperator
and used the underlying hooks.
I'd recommend you use the MySQLHook
inside of a python_callable
. Something like this:
def count_mysql_and_then_use_the_count():
"""
Returns an SFTP connection created using the SSHHook
"""
mysql_hook = MySQLHook(...)
cur = conn.cursor()
cur.execute("""SELECT count(*) from table 1 where id>100""")
for count in cur:
# Do something with the count...
I'm not sure if this will work as is but the idea is use a hook inside your Python callable, I don't use the MySQLHook
often but I did this with the SSHHook
and it's been working great.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With