I am really a newbie in this forum. But I have been playing with airflow, for sometime, for our company. Sorry if this question sounds really dumb.
I am writing a pipeline using bunch of BashOperators. Basically, for each Task, I want to simply call a REST api using 'curl'
This is what my pipeline looks like(very simplified version):
from airflow import DAG from airflow.operators import BashOperator, PythonOperator from dateutil import tz import datetime datetime_obj = datetime.datetime default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()), 'email': ['[email protected]'], 'email_on_failure': True, 'email_on_retry': False, 'retries': 2, 'retry_delay': datetime.timedelta(minutes=5), } current_datetime = datetime_obj.now(tz=tz.tzlocal()) dag = DAG( 'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60)) curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"' t1 = BashOperator( task_id='rest-api-1', bash_command=curl_cmd, dag=dag)
If you notice I am doing current_datetime= datetime_obj.now(tz=tz.tzlocal())
Instead what I want here is 'execution_date'
How do I use 'execution_date' directly and assign it to a variable in my python file?
I have having this general issue of accessing args. Any help will be genuinely appreciated.
Thanks
Variables in Airflow are a generic way to store and retrieve arbitrary content or settings as a simple key-value store within Airflow. Variables can be listed, created, updated, and deleted from the UI (Admin -> Variables), code, or CLI. In addition, JSON settings files can be bulk uploaded through the UI.
The execution time in Airflow is not the actual run time, but rather the start timestamp of its schedule period. For example, the execution time of the first DAG run is 2019–12–05 7:00:00, though it is executed on 2019–12–06.
The BashOperator
's bash_command
argument is a template. You can access execution_date
in any template as a datetime
object using the execution_date
variable. In the template, you can use any jinja2
methods to manipulate it.
Using the following as your BashOperator
bash_command
string:
# pass in the first of the current month some_command.sh {{ execution_date.replace(day=1) }} # last day of previous month some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}
If you just want the string equivalent of the execution date, ds
will return a datestamp (YYYY-MM-DD), ds_nodash
returns same without dashes (YYYYMMDD), etc. More on macros
is available in the Api Docs.
Your final operator would look like:
command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals() t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
The PythonOperator constructor takes a 'provide_context' parameter (see https://pythonhosted.org/airflow/code.html). If it's True, then it passes a number of parameters into the python_callable via kwargs. kwargs['execution_date'] is what you want, I believe.
Something like this:
def python_method(ds, **kwargs): Variable.set('execution_date', kwargs['execution_date']) return doit = PythonOperator( task_id='doit', provide_context=True, python_callable=python_method, dag=dag)
I'm not sure how to do it with the BashOperator, but you might start with this issue: https://github.com/airbnb/airflow/issues/775
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With