Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I handle errors in a worker pool using WaitGroup?

I got a problem using sync.WaitGroup and select together. If you take a look at following http request pool you will notice that if an error occurs it will never be reported as wg.Done() will block and there is no read from the channel anymore.

package pool

import (
    "fmt"
    "log"
    "net/http"
    "sync"
)

var (
    MaxPoolQueue  = 100
    MaxPoolWorker = 10
)

type Pool struct {
    wg *sync.WaitGroup

    queue  chan *http.Request
    errors chan error
}

func NewPool() *Pool {
    return &Pool{
        wg: &sync.WaitGroup{},

        queue:  make(chan *http.Request, MaxPoolQueue),
        errors: make(chan error),
    }
}

func (p *Pool) Add(r *http.Request) {
    p.wg.Add(1)

    p.queue <- r
}

func (p *Pool) Run() error {
    for i := 0; i < MaxPoolWorker; i++ {
        go p.doWork()
    }

    select {
    case err := <-p.errors:
        return err
    default:
        p.wg.Wait()
    }

    return nil
}

func (p *Pool) doWork() {
    for r := range p.queue {
        fmt.Printf("Request to %s\n", r.Host)

        p.wg.Done()

        _, err := http.DefaultClient.Do(r)

        if err != nil {
            log.Fatal(err)

            p.errors <- err
        } else {
            fmt.Printf("no error\n")
        }
    }
}

Source can be found here

How can I still use WaitGroup but also get errors from go routines?

like image 412
bodokaiser Avatar asked Jul 12 '14 17:07

bodokaiser


People also ask

How do you handle a Goroutine error?

Using a channel to return something is the only way you can guarantee to collect all return values from goroutines without overwriting them. There are multiple ways we can use the same return channels: We can use the return channel to indicate only the error. We can use it to return both the result and the error.

What is a worker pool?

The worker pool pattern is a design in which a fixed number of workers are given a stream of tasks to process in a queue. The tasks stay in the queue until a worker is free to pick up the task and execute it. Worker pools are great for controlling the concurrent execution for a set of defined jobs.

What is Workerpool Golang?

Package workerpool queues work to a limited number of goroutines. The purpose of the worker pool is to limit the concurrency of tasks executed by the workers. This is useful when performing tasks that require sufficient resources (CPU, memory, etc.), and running too many tasks at the same time would exhaust resources.


1 Answers

Just got the answer my self as I wrote the question and as I think it is an interesting case I would like to share it with you.

The trick to use sync.WaitGroup and chan together is that we wrap:

select {
    case err := <-p.errors:
        return err
    default:
        p.wg.Done()
}

Together in a for loop:

for {
    select {
        case err := <-p.errors:
            return err
        default:
            p.wg.Done()
    }
}

In this case select will always check for errors and wait if nothing happens :)

like image 144
bodokaiser Avatar answered Oct 12 '22 13:10

bodokaiser