Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use airflow xcoms with MySqlOperator

def mysql_operator_test():
    DEFAULT_DATE = datetime(2017, 10, 9)
    t = MySqlOperator(
        task_id='basic_mysql',
        sql="SELECT count(*) from table 1 where id>100;",
        mysql_conn_id='mysql_default',
        dag=dag)
    t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=False)

run_this = PythonOperator(
    task_id='getRecoReq',
    python_callable=mysql_operator_test,
    # xcom_push=True,
    dag=dag)

task2 = PythonOperator(
    task_id= 'mysql_select',
    provide_context=True,
    python_callable = blah,
    templates_dict = {'requests': "{{ ti.xcom_pull(task_ids='getReq') }}" },
    dag=dag)

run_this.set_downstream(task2)

I want to capture the count returned by the MySqlOperator using xcoms. Can someone please guide regarding the same?

like image 335
gpk27 Avatar asked Oct 10 '17 15:10

gpk27


1 Answers

You're very close! However, the way you're asking this question is kind of an anti-pattern. You don't want to share data across tasks in Airflow. Also, you don't want to use the operator like you are in mysql_operator_test. It's tempting, I did the same thing when I was getting started.

I tried something very similar to this but with SFTP connections. I eventually just did everything inside of the PythonOperator and used the underlying hooks.

I'd recommend you use the MySQLHook inside of a python_callable. Something like this:

def count_mysql_and_then_use_the_count():
    """
    Returns an SFTP connection created using the SSHHook
    """
    mysql_hook = MySQLHook(...)
    cur = conn.cursor()
    cur.execute("""SELECT count(*) from table 1 where id>100""")
    for count in cur: 
       # Do something with the count...

I'm not sure if this will work as is but the idea is use a hook inside your Python callable, I don't use the MySQLHook often but I did this with the SSHHook and it's been working great.

like image 133
Mike Avatar answered Sep 28 '22 01:09

Mike