Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using python string substitution and xcom_pull in Airflow HttpSensor operator

Tags:

airflow

I have a use-case where Im inside a for loop and need to populate fields in an HttpSensor task dynamically

I tried using this syntax:

Method 1 FAILED:

s = 'sensor_task_sd_{0}'.format(d)
            sensor_task_sd = HttpSensor(
                task_id=s,
                http_conn_id='ss_api',
                endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t),
                request_params={'X-Requested-By': 'abc_123'},
                response_check=lambda response: True if "FINISHED" in response.text else False,
                poke_interval=10,
                soft_fail=True,
                timeout=600,
                dag=dag_subdag,
                )

but it fails because in this line:

endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t)

I cannot make the python string substitution with .format(t) work.

Instead if I hard code some value the above code works... for example, the code below works fine:

Method 2 SUCCESS:

s = 'sensor_task_sd_{0}'.format(d)
sensor_task_sd = HttpSensor(
               task_id=s,
               http_conn_id='ss_api',
        endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids='start_pipeline_sd_campaignhistory')}}/status?rev=0",
        request_params={'X-Requested-By': 'abc_123'},
               response_check=lambda response: True if "FINISHED" in response.text else False,
               poke_interval=10,
               soft_fail=True,
               timeout=600,
               dag=dag_subdag)

I just donot understand this bit..... Ive tried every combination of tricks to make it work, it just doesnt take the string interpolations which Iam using to keep the code dynamic.

so my question is very simple:

How can I make the HttpSensor operator dynamic? I donot want to hard code my function values within the endpoint string(Method 2 style), I would like to use values that are set at run time (Method 1 style).

How can I make Method 1 work?

like image 789
banditKing Avatar asked Jan 27 '23 07:01

banditKing


1 Answers

So Airflow uses Jinja to template its strings, and when you mix Jinja templating and Python formatting, you need to "escape" the curly brackets Jinja needs so that Python formatting doesn't consume them. You do so by doubling up every curly bracket that isn't for the .format() call.

This should give you the results you need.

endpoint = "/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={}) }}}}/status?rev=0".format(t)

Incidentally, in my experience using f-strings (Python 3.6+) or named formatting parameters if you're able can really help code clarity when mixing the two in an Airflow script. But you'll still need to 'escape' the curly brackets.

f-strings:

endpoint = f"/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={t})}} }}/status?rev=0"

Named format parameters:

endpoint = "/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={task_id}) }}}}/status?rev=0".format(task_id=t)

Hope that helps :)

like image 107
Liam Clarke Avatar answered Jan 28 '23 19:01

Liam Clarke