My orchestrator receives a payload, with that payload it contains instructions that need to be passed along with other sets of data to activity functions.
how do I pass multiple parameters to an activity function? Or do I have to mash all my data together?
def orchestrator_function(context: df.DurableOrchestrationContext):
# User defined configuration
instructions: str = context.get_input()
task_batch = yield context.call_activity("get_tasks", None)
# Need to pass in instructions too
parallel_tasks = [context.call_activity("perform_task", task) for task in task_batch]
results = yield context.task_all(parallel_tasks)
return results
The perform_task
activity needs both the items from task_batch
and the user input instructions
Do I do something in my function.json
?
Workaround Not ideal, but I can pass multiple parameters as a single Tuple
something = yield context.call_activity("activity", ("param_1", "param_2"))
I then just need to reference the correct index of the parameter in the activity.
Seems there's no text-book way to do it. I have opted to give my single parameter a generic name like parameter
or payload
.
Then when passing in the value in the orchestrator I do it like so:
payload = {"value_1": some_var, "value_2": another_var}
something = yield context.call_activity("activity", payload)
then within the activity function, I unpack it again.
edit: Some buried documentation seems to show that https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-error-handling?tabs=python
Just to add to @Ari's great answer, here code to pass data from client function (HTTP request in this case) all the way to activity function:
Client -> Orchestrator -> Activity
Client
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
req_data = req.get_json()
img_url = req_data['img_url']
payload = {"img_url": img_url}
instance_id = await client.start_new(req.route_params["functionName"], None, payload)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
Orchestrator
def orchestrator_function(context: df.DurableOrchestrationContext):
input_context = context.get_input()
img_url = input_context.get('img_url')
some_response= yield context.call_activity('MyActivity', img_url)
return [some_response]
Activity
def main(imgUrl: str) -> str:
print(f'.... Image URL = {imgUrl}')
return imgUrl
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With