I got a problem using sync.WaitGroup
and select
together. If you take a look at following http request pool you will notice that if an error occurs it will never be reported as wg.Done()
will block and there is no read from the channel anymore.
package pool
import (
"fmt"
"log"
"net/http"
"sync"
)
var (
MaxPoolQueue = 100
MaxPoolWorker = 10
)
type Pool struct {
wg *sync.WaitGroup
queue chan *http.Request
errors chan error
}
func NewPool() *Pool {
return &Pool{
wg: &sync.WaitGroup{},
queue: make(chan *http.Request, MaxPoolQueue),
errors: make(chan error),
}
}
func (p *Pool) Add(r *http.Request) {
p.wg.Add(1)
p.queue <- r
}
func (p *Pool) Run() error {
for i := 0; i < MaxPoolWorker; i++ {
go p.doWork()
}
select {
case err := <-p.errors:
return err
default:
p.wg.Wait()
}
return nil
}
func (p *Pool) doWork() {
for r := range p.queue {
fmt.Printf("Request to %s\n", r.Host)
p.wg.Done()
_, err := http.DefaultClient.Do(r)
if err != nil {
log.Fatal(err)
p.errors <- err
} else {
fmt.Printf("no error\n")
}
}
}
Source can be found here
How can I still use WaitGroup but also get errors from go routines?
Using a channel to return something is the only way you can guarantee to collect all return values from goroutines without overwriting them. There are multiple ways we can use the same return channels: We can use the return channel to indicate only the error. We can use it to return both the result and the error.
The worker pool pattern is a design in which a fixed number of workers are given a stream of tasks to process in a queue. The tasks stay in the queue until a worker is free to pick up the task and execute it. Worker pools are great for controlling the concurrent execution for a set of defined jobs.
Package workerpool queues work to a limited number of goroutines. The purpose of the worker pool is to limit the concurrency of tasks executed by the workers. This is useful when performing tasks that require sufficient resources (CPU, memory, etc.), and running too many tasks at the same time would exhaust resources.
Just got the answer my self as I wrote the question and as I think it is an interesting case I would like to share it with you.
The trick to use sync.WaitGroup
and chan
together is that we wrap:
select {
case err := <-p.errors:
return err
default:
p.wg.Done()
}
Together in a for
loop:
for {
select {
case err := <-p.errors:
return err
default:
p.wg.Done()
}
}
In this case select will always check for errors and wait if nothing happens :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With