I have an existing Function App with 2 Functions and a storage queue. F1 is triggered by a message in a service bus topic. For each msg received, F1 calculates some sub-tasks (T1,T2,...) which have to be executed with varying amount of delay. Ex - T1 to be fired after 3 mins, T2 after 5min etc. F1 posts messages to a storage queue with appropriate visibility timeouts (to simulate the delay) and F2 is triggered whenever a message is visible in the queue. All works fine.
I now want to migrate this app to use 'Durable Functions'. F1 now only starts the orchestrator. The orchestrator code is something as follows -
public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
List<Task> tasks = new List<Task>();
foreach (var value in results)
{
var pnTask = context.CallActivityAsync("PerformSubTask", value);
tasks.Add(pnTask);
}
//dont't await as we want to fire and forget. No fan-in!
//await Task.WhenAll(tasks);
}
[FunctionName("PerformSubTask")]
public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
{
TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;
//will still keep the activity function running and incur costs??
await Task.Delay(actualDelay);
//perform subtask work after delay!
}
I would only like to fan-out (no fan-in to collect the results) and start the subtasks. The orchestrator starts all the tasks and avoids call 'await Task.WhenAll'. The activity function calls 'Task.Delay' to wait for the specified amount of time and then does its work.
My questions
Does it make sense to use Durable Functions for this workflow?
Is this the right approach to orchestrate 'Fan-out' workflow?
I do not like the fact that the activity function is running for specified amount of time (3 or 5 mins) doing nothing. It will incurs costs,or?
Also if a delay of more than 10 minutes is required there is no way for an activity function to succeed with this approach!
My earlier attempt to avoid this was to use 'CreateTimer' in the orchestrator and then add the activity as a continuation, but I see only timer entries in the 'History' table. The continuation does not fire! Am I violating the constraint for orchestrator code - 'Orchestrator code must never initiate any async operation' ?
foreach (var value in results)
{
//calculate time to start
var timeToStart = ;
var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value));
tasks.Add(pnTask);
}
UPDATE : using approach suggested by Chris
Activity that calculates subtasks and delays
[FunctionName("CalculateTasks")]
public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log)
{
//in reality time is obtained by calling an endpoint
DateTime currentTime = DateTime.UtcNow;
return new List<TaskInfo> {
new TaskInfo{ DelayInSeconds = 10, Origin = currentTime },
new TaskInfo{ DelayInSeconds = 20, Origin = currentTime },
new TaskInfo{ DelayInSeconds = 30, Origin = currentTime },
};
}
public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
var currentTime = context.CurrentUtcDateTime;
List<Task> tasks = new List<Task>();
foreach (var value in results)
{
TimeSpan timeDifference = currentTime - value.Origin;
TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds);
var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;
var timeToStart = currentTime.Add(actualDelay);
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(t => context.CallActivityAsync("PerformSubtask", value));
tasks.Add(delayedActivityCall);
}
await Task.WhenAll(tasks);
}
Simply scheduling tasks from within the orchestrator seems to work.In my case I am calculating the tasks and the delays in another activity (CalculateTasks) before the loop. I want the delays to be calculated using the 'current time' when the activity was run. I am using DateTime.UtcNow in the activity. This somehow does not play well when used in the orchestrator. The activities specified by 'ContinueWith' just don't run and the orchestrator is always in 'Running' state.
Can I not use the time recorded by an activity from within the orchestrator?
UPDATE 2
So the workaround suggested by Chris works!
Since I do not want to collect the results of the activities I avoid calling 'await Tasks.WhenAll(tasks)
' after scheduling all activities. I do this in order to reduce the contention on the control queue i.e. be able to start another orchestration if reqd. Nevertheless the status of the 'orchestrator' is still 'Running' till all the activities finish running. I guess it moves to 'Complete' only after the last activity posts a 'done' message to the control queue.
Am I right? Is there any way to free the orchestrator earlier i.e right after scheduling all activities?
The ContinueWith
approach worked fine for me. I was able to simulate a version of your scenario using the following orchestrator code:
[FunctionName("Orchestrator")]
public static async Task Orchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context,
TraceWriter log)
{
var tasks = new List<Task>(10);
for (int i = 0; i < 10; i++)
{
int j = i;
DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
tasks.Add(delayedActivityCall);
}
await Task.WhenAll(tasks);
}
And for what it's worth, here is the activity function code.
[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
log.Warning($"{DateTime.Now:o}: {j:00}");
}
From the log output, I saw that all activity invocations ran 10 seconds apart from each other.
Another approach would be to fan out to multiple sub-orchestrations (like @jeffhollan suggested) which are simple a short sequence of a durable timer delay and your activity call.
UPDATE I tried using your updated sample and was able to reproduce your problem! If you run locally in Visual Studio and configure the exception settings to always break on exceptions, then you should see the following:
System.InvalidOperationException: 'Multithreaded execution was detected. This can happen if the orchestrator function code awaits on a task that was not created by a DurableOrchestrationContext method. More details can be found in this article https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints.'
This means the thread which called context.CallActivityAsync("PerformSubtask", j)
was not the same as the thread which called the orchestrator function. I don't know why my initial example didn't hit this, or why your version did. It has something to do with how the TPL decides which thread to use to run your ContinueWith
delegate - something I need to look more into.
The good news is that there is a simple workaround, which is to specify TaskContinuationOptions.ExecuteSynchronously, like this:
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(
t => context.CallActivityAsync("PerformSubtask", j),
TaskContinuationOptions.ExecuteSynchronously);
Please try that and let me know if that fixes the issue you're observing.
Ideally you wouldn't need to do this workaround when using Task.ContinueWith
. I've opened an issue in GitHub to track this: https://github.com/Azure/azure-functions-durable-extension/issues/317
Since I do not want to collect the results of the activities I avoid calling
await Tasks.WhenAll(tasks)
after scheduling all activities. I do this in order to reduce the contention on the control queue i.e. be able to start another orchestration if reqd. Nevertheless the status of the 'orchestrator' is still 'Running' till all the activities finish running. I guess it moves to 'Complete' only after the last activity posts a 'done' message to the control queue.
This is expected. Orchestrator functions never actually complete until all outstanding durable tasks have completed. There isn't any way to work around this. Note that you can still start other orchestrator instances, there just might be some contention if they happen to land on the same partition (there are 4 partitions by default).
await Task.Delay
is definitely not the best option: you will pay for this time while your function won't do anything useful. The max delay is also bound to 10 minutes on Consumption plan.
In my opinion, raw Queue messages are the best option for fire-and-forget scenarios. Set the proper visibility timeouts, and your scenario will work reliably and efficiently.
The killer feature of Durable Functions are await
s, which do their magic of pausing and resuming while keeping the scope. Thus, it's a great way to implement fan-in, but you don't need that.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With