I have a use-case where Im inside a for loop and need to populate fields in an HttpSensor task dynamically
I tried using this syntax:
Method 1 FAILED:
s = 'sensor_task_sd_{0}'.format(d)
sensor_task_sd = HttpSensor(
task_id=s,
http_conn_id='ss_api',
endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t),
request_params={'X-Requested-By': 'abc_123'},
response_check=lambda response: True if "FINISHED" in response.text else False,
poke_interval=10,
soft_fail=True,
timeout=600,
dag=dag_subdag,
)
but it fails because in this line:
endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t)
I cannot make the python string substitution with .format(t) work.
Instead if I hard code some value the above code works... for example, the code below works fine:
Method 2 SUCCESS:
s = 'sensor_task_sd_{0}'.format(d)
sensor_task_sd = HttpSensor(
task_id=s,
http_conn_id='ss_api',
endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids='start_pipeline_sd_campaignhistory')}}/status?rev=0",
request_params={'X-Requested-By': 'abc_123'},
response_check=lambda response: True if "FINISHED" in response.text else False,
poke_interval=10,
soft_fail=True,
timeout=600,
dag=dag_subdag)
I just donot understand this bit..... Ive tried every combination of tricks to make it work, it just doesnt take the string interpolations which Iam using to keep the code dynamic.
so my question is very simple:
How can I make the HttpSensor operator dynamic? I donot want to hard code my function values within the endpoint string(Method 2 style), I would like to use values that are set at run time (Method 1 style).
How can I make Method 1 work?
So Airflow uses Jinja to template its strings, and when you mix Jinja templating and Python formatting, you need to "escape" the curly brackets Jinja needs so that Python formatting doesn't consume them. You do so by doubling up every curly bracket that isn't for the .format()
call.
This should give you the results you need.
endpoint = "/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={}) }}}}/status?rev=0".format(t)
Incidentally, in my experience using f-strings (Python 3.6+) or named formatting parameters if you're able can really help code clarity when mixing the two in an Airflow script. But you'll still need to 'escape' the curly brackets.
f-strings:
endpoint = f"/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={t})}} }}/status?rev=0"
Named format parameters:
endpoint = "/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={task_id}) }}}}/status?rev=0".format(task_id=t)
Hope that helps :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With