I am trying to implement an Azure Durable Function workflow.
Every 6 minutes I have an Azure TimerTrigger function calls an Azure Orchestration Function (OrchestrationTrigger), that in turn starts a number of activity functions (ActivityTrigger).
Sometimes, however, the Orchestration function gets called twice in the span of a few seconds! This is a big problem as my activity functions are not idempotent!
Below is how my code gets called.
TimerTriggered Function:
[FunctionName("StartupFunc")]
public static async Task Run([TimerTrigger("0 */6 * * * *", RunOnStartup = true, UseMonitor = false)]TimerInfo myStartTimer, [OrchestrationClient] DurableOrchestrationClient orchestrationClient, TraceWriter log)
{
List<OrchestrationModel> ExportModels = await getData();
string id = await orchestrationClient.StartNewAsync("OrchestratorFunc", ExportModels);
}
Orchestration Function:
[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var dataList = context.GetInput<List<OrchestrationModel>>();
var tasks = new List<Task>();
foreach (var data in dataList)
{
tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
}
await Task.WhenAll(tasks);
}
Activity function:
[FunctionName("TransformToSql")]
[public static async Task<string> RunTransformation([ActivityTrigger] DurableActivityContext context, TraceWriter log)
{
TransformModel = context.GetInput<TransformModel>();
//Do some work with TransformModel
}
This behaviour is perfectly fine - this is how Durable Functions works by design.
You have the following orchestration:
[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var dataList = context.GetInput<List<OrchestrationModel>>();
var tasks = new List<Task>();
foreach (var data in dataList)
{
tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
}
await Task.WhenAll(tasks);
}
When an activity is called, the flow returns to a concept called Dispatcher - it is an internal being of Durable Functions, responsible for maintaining the flow of your orchestration. As it awaits until a task is finished, an orchestration is deallocated temporarily. Once a task is completed, the whole orchestration is replayed until the next await
happens.
The important thing there is, though an orchestration is replayed, an activity is not called once more - its result is fetched from the storage and used. The make things more explicit, please consider the following example:
[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var dataList = context.GetInput<List<OrchestrationModel>>();
var tasks = new List<Task>();
foreach (var data in dataList)
{
await context.CallActivityAsync<string>("TransformToSql1", new TransformModel(data);
await context.CallActivityAsync<string>("TransformToSql2", new TransformModel(data);
}
}
When TransformToSql1
is awaited, the orchestration is deallocated and the whole flow waits until this activity is finished. Then the orchestration is replayed - it once more awaits TransformToSql1
but since the result of it is saved, it just goes back to the orchestration and awaits TransformToSql2
- then the process is repeated.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With