Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass SQL as file with parameters to Airflow Operator

Tags:

airflow

I have an Operator in Airflow:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

Now, the actual query I need to run is 24 rows long. I want to save it in a file and give the operator the path for the SQL file. The operator support this but I'm not sure what to do with the parameter the SQL is needed.

Suggestions?

EDIT: This is my code:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
    sql = '{{ templates_dict.sql }}',
    params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' :  TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

This gives:

jinja2.exceptions.UndefinedError: 'templates_dict' is undefined

like image 506
Programmer120 Avatar asked Oct 07 '18 13:10

Programmer120


Video Answer


1 Answers

As you've noticed, the MySqlToGoogleCloudStorageOperator specifies a template_ext with the .sql extension.

First in your Dag, specify the path where you put your .sql file

dag = DAG('my_dag', default_args=default_args, schedule_interval="30 7 * * *", template_searchpath = ['/home/ubuntu/airflow/.../myfolder'])

In the yourfile.sql put your large query. Notice the params.ord_id

SELECT * FROM orders where orderid> {{ params.ord_id }}

Now in the sql argument of the operator, pass the name of the file.

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql='yourfile.sql',
    params={"ord_id":99},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

It is important that you don't put a space after that file name. This is because the Jinja templating engine will look for that string ending with .sql and if it does, it will treat it as a file rather than as a string.

like image 194
Mendhak Avatar answered Oct 20 '22 22:10

Mendhak