Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Go worker pool while limiting number of goroutines and timeout for calculations

Tags:

go

I have a function that should make at max N number of goroutines, then each goroutine will read from the jobs channel and to some calculations. However the caveat is if the calculations take more than X amount of time, end that calculation and move on to the next one.

func doStuff(){
    rules := []string{
        "a",
        "b",
        "c",
        "d",
        "e",
        "f",
        "g",
    }
    var (
        jobs    = make(chan []string, len(rules))
        res     = make(chan bool, len(rules))
        matches []string
    )

    w := func(jobs <-chan []string, results chan<- bool) {
        for j := range jobs {
            k, id := j[0], j[1]
            if id == "c" || id == "e" {
                time.Sleep(time.Second * 5)
            }
            m := match(k, id)
            res <- m
        }
    }
    N := 2
    for i := 0; i < N; i++ {
        go w(jobs, res)
    }

    for _, rl := range rules {
        jobs <- []string{"a", rl}
    }
    close(jobs)

    for i := 0; i < len(rules); i++ {
        select {
        case match := <-res:
            matches = append(matches, match)
        case <-time.After(time.Second):
        }
    }
    fmt.Println(matches)
}

The expected result is:

[a, b, d, f, g]

But what I'm getting is:

[a, b, d]

It seems reading from the results channel ends before one of the goroutines can fully finish due to the sleep. So I added a context with deadline, but now it hangs indefinitely:

    w := func(jobs <-chan []string, results chan<- string) {
        for j := range jobs {
            ctx, c := context.WithDeadline(context.Background(), time.Now().Add(time.Second*2))
            defer c()
            k, id := j[0], j[1]
            if id == "c" || id == "e" {
                time.Sleep(time.Second * 5)
            }
            m := match(k, id)
            select {
            case res <- m:
            case <-ctx.Done():
                fmt.Println("Canceled by timeout")
                continue
            }
        }
    }

I've read other questions regarding completely killing off a goroutine if something times out, but couldn't find anything on skipping if something times out.

like image 998
namenamename Avatar asked Nov 01 '25 15:11

namenamename


1 Answers

I made a package for use cases just like this one. Please see this repository: github.com/MicahParks/ctxerrgroup.

Here's a full example of how your code would look using the package and streaming the results. The streaming approach is more memory efficient. The original approach held all of the results in memory before printing them at the end.

package main

import (
    "context"
    "log"
    "time"

    "github.com/MicahParks/ctxerrgroup"
)

func main() {

    // The number of worker goroutines to use.
    workers := uint(2)

    // Create an error handler that logs all errors.
    //
    // The original work item didn't return an error, so this is not required.
    var errorHandler ctxerrgroup.ErrorHandler
    errorHandler = func(_ ctxerrgroup.Group, err error) {
        log.Printf("A job in the worker pool failed.\nError: %s", err.Error())
    }

    // Create the group of workers.
    group := ctxerrgroup.New(workers, errorHandler)

    // Create the question specific assets.
    rules := []string{
        "a",
        "b",
        "c",
        "d",
        "e",
        "f",
        "g",
    }
    results := make(chan bool)

    // Create a parent timeout.
    timeout := time.Second
    parentTimeout, parentCancel := context.WithTimeout(context.Background(), timeout)
    defer parentCancel()

    // Iterate through all the rules to use.
    for _, rule := range rules {

        // Create a child context for this specific work item.
        ctx, cancel := context.WithCancel(parentTimeout)

        // Create and add the work item.
        group.AddWorkItem(ctx, cancel, func(workCtx context.Context) (err error) {

            // Deliberately shadow the rule so the next iteration doesn't take over.
            rule := rule

            // Do the work using the workCtx.
            results <- match(workCtx, "a", rule)

            return nil
        })
    }

    // Launch a goroutine that will close the results channel when everyone is finished.
    go func() {
        group.Wait()
        close(results)
    }()

    // Print the matches as the happen. This will not hang.
    for result := range results {
        log.Println(result)
    }

    // Wait for the group to finish.
    //
    // This is not required, but doesn't hurt as group.Wait is idempotent. It's here in case you remove the goroutine
    // waiting and closing the channel above.
    group.Wait()
}

// match is a function from the original question. It now accepts and properly uses the context argument.
func match(ctx context.Context, key, id string) bool {
    panic("implement me")
}
like image 175
Micah Parks Avatar answered Nov 03 '25 10:11

Micah Parks