Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

execution_date in airflow: need to access as a variable

Tags:

airflow

I am really a newbie in this forum. But I have been playing with airflow, for sometime, for our company. Sorry if this question sounds really dumb.

I am writing a pipeline using bunch of BashOperators. Basically, for each Task, I want to simply call a REST api using 'curl'

This is what my pipeline looks like(very simplified version):

from airflow import DAG from airflow.operators import BashOperator, PythonOperator from dateutil import tz import datetime  datetime_obj = datetime.datetime  default_args = {     'owner': 'airflow',     'depends_on_past': False,     'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),     'email': ['[email protected]'],     'email_on_failure': True,     'email_on_retry': False,     'retries': 2,     'retry_delay': datetime.timedelta(minutes=5), }   current_datetime = datetime_obj.now(tz=tz.tzlocal())  dag = DAG(     'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))  curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'   t1 = BashOperator(     task_id='rest-api-1',     bash_command=curl_cmd,     dag=dag) 

If you notice I am doing current_datetime= datetime_obj.now(tz=tz.tzlocal()) Instead what I want here is 'execution_date'

How do I use 'execution_date' directly and assign it to a variable in my python file?

I have having this general issue of accessing args. Any help will be genuinely appreciated.

Thanks

like image 942
Roger Avatar asked Apr 19 '16 22:04

Roger


People also ask

How do you use variables in Airflow?

Variables in Airflow are a generic way to store and retrieve arbitrary content or settings as a simple key-value store within Airflow. Variables can be listed, created, updated, and deleted from the UI (Admin -> Variables), code, or CLI. In addition, JSON settings files can be bulk uploaded through the UI.

What is the execution date in Airflow?

The execution time in Airflow is not the actual run time, but rather the start timestamp of its schedule period. For example, the execution time of the first DAG run is 2019–12–05 7:00:00, though it is executed on 2019–12–06.


2 Answers

The BashOperator's bash_command argument is a template. You can access execution_date in any template as a datetime object using the execution_date variable. In the template, you can use any jinja2 methods to manipulate it.

Using the following as your BashOperator bash_command string:

# pass in the first of the current month some_command.sh {{ execution_date.replace(day=1) }}  # last day of previous month some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }} 

If you just want the string equivalent of the execution date, ds will return a datestamp (YYYY-MM-DD), ds_nodash returns same without dashes (YYYYMMDD), etc. More on macros is available in the Api Docs.


Your final operator would look like:

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals() t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag) 
like image 86
Erik Schuchmann Avatar answered Sep 21 '22 10:09

Erik Schuchmann


The PythonOperator constructor takes a 'provide_context' parameter (see https://pythonhosted.org/airflow/code.html). If it's True, then it passes a number of parameters into the python_callable via kwargs. kwargs['execution_date'] is what you want, I believe.

Something like this:

def python_method(ds, **kwargs):     Variable.set('execution_date', kwargs['execution_date'])     return  doit = PythonOperator(     task_id='doit',     provide_context=True,     python_callable=python_method,     dag=dag) 

I'm not sure how to do it with the BashOperator, but you might start with this issue: https://github.com/airbnb/airflow/issues/775

like image 31
Ziggy Eunicien Avatar answered Sep 23 '22 10:09

Ziggy Eunicien