Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: pass {{ ds }} as param to PostgresOperator

i would like to use execution date as parameter to my sql file:

i tried

dt = '{{ ds }}'

s3_to_redshift = PostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    params={'file': dt},
    dag=dag
)

but it doesn't work.

like image 791
Omar14 Avatar asked Jun 13 '17 13:06

Omar14


People also ask

How do you pass runtime arguments in airflow Dag?

You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field. Save this answer.

What is airflow used for?

What is Airflow Used For? Apache Airflow is used for the scheduling and orchestration of data pipelines or workflows. Orchestration of data pipelines refers to the sequencing, coordination, scheduling, and managing complex data pipelines from diverse sources.

What is Postgres_conn_id?

postgres_conn_id (str) – The postgres conn id reference to a specific postgres database. autocommit (bool) – if True, each command is automatically committed. ( default value: False) parameters (Iterable | Mapping | None) – (optional) the parameters to render the SQL query with.

How do I connect to postgresql database in airflow?

Configuring the ConnectionSpecify the schema name to be used in the database. Specify the user name to connect. Specify the password to connect. Specify the extra parameters (as json dictionary) that can be used in postgres connection.


1 Answers

dt = '{{ ds }}'

Doesn't work because Jinja (the templating engine used within airflow) does not process the entire Dag definition file.

For each Operator there are fields which Jinja will process, which are part of the definition of the operator itself.

In this case, you can make the params field (which is actually called parameters, make sure to change this) templated if you extend the PostgresOperator like this:

class MyPostgresOperator(PostgresOperator):
    template_fields = ('sql','parameters')

Now you should be able to do:

s3_to_redshift = MyPostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    parameters={'file': '{{ ds }}'},
    dag=dag
)
like image 52
jhnclvr Avatar answered Sep 21 '22 04:09

jhnclvr