Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass multiple parameters to Azure Durable Activity Function

My orchestrator receives a payload, with that payload it contains instructions that need to be passed along with other sets of data to activity functions.

how do I pass multiple parameters to an activity function? Or do I have to mash all my data together?

def orchestrator_function(context: df.DurableOrchestrationContext):
    
    # User defined configuration
    instructions: str = context.get_input()

    task_batch = yield context.call_activity("get_tasks", None)
    
    # Need to pass in instructions too
    parallel_tasks = [context.call_activity("perform_task", task) for task in task_batch]

    results = yield context.task_all(parallel_tasks)
    
    return results

The perform_task activity needs both the items from task_batch and the user input instructions

Do I do something in my function.json?

Workaround Not ideal, but I can pass multiple parameters as a single Tuple

something = yield context.call_activity("activity", ("param_1", "param_2"))

I then just need to reference the correct index of the parameter in the activity.

like image 572
Ari Avatar asked Jan 24 '23 14:01

Ari


2 Answers

Seems there's no text-book way to do it. I have opted to give my single parameter a generic name like parameter or payload.

Then when passing in the value in the orchestrator I do it like so:

payload = {"value_1": some_var, "value_2": another_var}
something = yield context.call_activity("activity", payload)

then within the activity function, I unpack it again.

edit: Some buried documentation seems to show that https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-error-handling?tabs=python

like image 169
Ari Avatar answered Jan 27 '23 03:01

Ari


Just to add to @Ari's great answer, here code to pass data from client function (HTTP request in this case) all the way to activity function:

Client -> Orchestrator -> Activity

Client

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)

    req_data = req.get_json()
    img_url = req_data['img_url']
    payload = {"img_url": img_url}
    
    instance_id = await client.start_new(req.route_params["functionName"], None, payload)

    logging.info(f"Started orchestration with ID = '{instance_id}'.")

    return client.create_check_status_response(req, instance_id)

Orchestrator

def orchestrator_function(context: df.DurableOrchestrationContext):
    input_context = context.get_input()
    img_url = input_context.get('img_url')

    some_response= yield context.call_activity('MyActivity', img_url)
    
    return [some_response]

Activity

def main(imgUrl: str) -> str:
    print(f'.... Image URL = {imgUrl}')

    return imgUrl
like image 39
gary Avatar answered Jan 27 '23 02:01

gary