If briefly, our task is to process a lot of input messages.
To solve this we decided to use Azure Queue Storage and Azure Functions. We have Azure Functions structure similar to the following code:
Queue triggered function
[FunctionName("MessageControllerExecutor")]
public static async void Run(
[QueueTrigger(QUEUE_NAME, Connection = QUEUE_CONNECTION_NAME)]string queueMessage,
[OrchestrationClient] DurableOrchestrationClient client,
TraceWriter log)
{
await client.StartNewAsync("MessageController", queueMessage);
}
Durable function
[FunctionName("MessageController")]
public static async void Run(
[OrchestrationTrigger] DurableOrchestrationContext context,
TraceWriter log)
{
if (!context.IsReplaying) log.Warning("MessageController started");
var function1ResultTask = context.CallActivityAsync<ResultMessage>("Function_1", new InputMessage());
var function2ResultTask = context.CallActivityAsync<ResultMessage>("Function_2", new InputMessage());
await Task.WhenAll(function1ResultTask, function2ResultTask);
// process Function_1 and Function_2 results
// ...
}
Simple activity function sample
[FunctionName("Function_1")]
public static ResultMessage Run(
[ActivityTrigger] DurableActivityContext activityContext,
TraceWriter log)
{
var msg = activityContext.GetInput<InputMessage>();
int time = new Random().Next(1, 3);
Thread.Sleep(time * 1000);
return new ResultMessage()
{
Payload = $"Function_1 slept for {time} sec"
};
}
MessageControllerExecutor
triggered when a new item is received in a queue.
MessageController
is a Durable Function that uses a few simple activity functions to process each message.
When we push messages to the queue, the MessageControllerExecutor
function starts immediately and asynchronously fires the MessageController
and passes the message, so this works as expected.
But we are faced with the problem. Not all MessageController
function instances run.
For example, we pushed 100 messages into the queue, but only about 10-20% of the messages were processed by MessageController
.
Some messages were not processed or were processed with a long delay. It looks like durable functions failed to start б, though no exceptions were thrown.
We have a few questions:
There are no plans to support multiple triggers per Function. You will have to create a Function for each EventHub. If there is common code that may be shared between Functions, you may move them to a helper method that can be called from each Function.
There are currently four durable function types in Azure Functions: activity, orchestrator, entity, and client. The rest of this section goes into more details about the types of functions involved in an orchestration.
Use the queue trigger to start a function when a new item is received on a queue. The queue message is provided as input to the function. A C# function can be created using one of the following C# modes: In-process class library: compiled C# function that runs in the same process as the Functions runtime.
You did elide some code from your orchestration trigger in the question above for the purposes of brevity which I understand, but what exactly are you doing there after the await Task.WhenAll(...)
? If it includes any kind of significant processing you should really be farming that out to a third action function (e.g. Function_3
) to do and then simply returning the results from the orchestration function.
Update: I just noticed your functions are defined as async void
. If I had to guess, this would actually cause a problem for the runtime. Can you try changing it to async Task
and see if your problem goes away? As a general rule defining methods as async void
is frowned upon in .NET.
Some extension for Drew's answer. You should not use Thread.Sleep(), as the documentation states, instead use CreateTimer Api.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With