i would like to use execution date as parameter to my sql file:
i tried
dt = '{{ ds }}'
s3_to_redshift = PostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
params={'file': dt},
dag=dag
)
but it doesn't work.
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field. Save this answer.
What is Airflow Used For? Apache Airflow is used for the scheduling and orchestration of data pipelines or workflows. Orchestration of data pipelines refers to the sequencing, coordination, scheduling, and managing complex data pipelines from diverse sources.
postgres_conn_id (str) – The postgres conn id reference to a specific postgres database. autocommit (bool) – if True, each command is automatically committed. ( default value: False) parameters (Iterable | Mapping | None) – (optional) the parameters to render the SQL query with.
Configuring the ConnectionSpecify the schema name to be used in the database. Specify the user name to connect. Specify the password to connect. Specify the extra parameters (as json dictionary) that can be used in postgres connection.
dt = '{{ ds }}'
Doesn't work because Jinja (the templating engine used within airflow) does not process the entire Dag definition file.
For each Operator
there are fields which Jinja will process, which are part of the definition of the operator itself.
In this case, you can make the params
field (which is actually called parameters
, make sure to change this) templated if you extend the PostgresOperator
like this:
class MyPostgresOperator(PostgresOperator):
template_fields = ('sql','parameters')
Now you should be able to do:
s3_to_redshift = MyPostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
parameters={'file': '{{ ds }}'},
dag=dag
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With