Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Invoking the same activity inside a loop in cadence workflow

I have a question in cadence workflow as to , can we invoke the same activity with different inputs inside a for loop ? Will that code be deterministic? Will cadence be able to replay the events when it re-constructs the workflow, if the worker executing the workflow is stopped during the execution and restarted later.

For example, I have the following code.

   func init() {
    workflow.RegisterWithOptions(SampleWorkFlow, workflow.RegisterOptions{Name: "SampleWorkFlow"})
    activity.RegisterWithOptions(SampleActivity, activity.RegisterOptions{Name: "SampleActivity"})
    activity.RegisterWithOptions(SecondActivity, activity.RegisterOptions{Name: "SecondActivity"})
}

// SampleWorkFlow comment
func SampleWorkFlow(ctx workflow.Context, input string) error {

    fmt.Println("Workflow started")
    ctx = workflow.WithTaskList(ctx, sampleTaskList)
    ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)

    var result string
    err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
    if err != nil {
        return err
    }

    for i := 1; i <= 10; i++ {
        value := i
        workflow.Go(ctx, func(ctx workflow.Context) {
            err := workflow.ExecuteActivity(ctx, "SecondActivity", input, value).Get(ctx, &result)
            if err != nil {
                log.Println("err=", err)
            }
        })
    }

    return nil

}

// SampleActivity comment
func SampleActivity(ctx context.Context, value, v1 string) (string, error) {
    fmt.Println("Sample activity start")
    for i := 0; i <= 10; i++ {
        fmt.Println(i)
    }
    return "Hello " + value, nil
}

// SecondActivity comment
func SecondActivity(ctx context.Context, value int) (string, error) {

    fmt.Println("Second  activity start")

    fmt.Println("value=", value)
    fmt.Println("Second activity going to end")
    return "Hello " + fmt.Sprintf("%d", value), nil
}

Here, the Second activity is invoked parallely inside a for loop. My first question is , is this code deterministic ?

Let's say after 5 iterations of the loop, when i =5, the worker executing this workflow terminates, will cadence be able to replay the events if the workflow is started in another worker ?

Can you please answer my question ?

like image 351
Prakash Premkumar Avatar asked Mar 01 '26 00:03

Prakash Premkumar


1 Answers

Yes, this code is deterministic. It doesn't call any non-deterministic operations (like random or UUID generation) and uses workflow.Go to start a goroutine. So it is deterministic. The complexity of the code doesn't play a role in defining its determinism.

Unrelated nit. There is no need to use a goroutine in your sample as ExecuteActivity call is already non-blocking by returning a Future. So the sample can be simplified to:

func SampleWorkFlow(ctx workflow.Context, input string) error {

    fmt.Println("Workflow started")
    ctx = workflow.WithTaskList(ctx, sampleTaskList)
    ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)

    var result string
    err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
    if err != nil {
        return err
    }

    for i := 1; i <= 10; i++ {
       workflow.ExecuteActivity(ctx, "SecondActivity", input, i)
    }
    return nil
}

Note that this sample is still might execute not the way you expect as it completes workflow without waiting for the activities completion. So these activities are not even going to start.

Here is the code that is going to wait for the activities to complete:

func SampleWorkFlow(ctx workflow.Context, input string) error {

    fmt.Println("Workflow started")
    ctx = workflow.WithTaskList(ctx, sampleTaskList)
    ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)

    var result string
    err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
    if err != nil {
        return err
    }
    var results []workflow.Future
    for i := 1; i <= 10; i++ {
        future := workflow.ExecuteActivity(ctx, "SecondActivity", input, i)
        results = append(results, future)
    }
    for i := 0; i < 10; i++ {
        var result string
        err := results[i].Get(ctx, &result)
        if err != nil {
            log.Println("err=", err)
        }
    }
    return nil
}
like image 154
Maxim Fateev Avatar answered Mar 04 '26 00:03

Maxim Fateev



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!