I have a simple concurrency use case in go, and I cannot figure out an elegant solution to my problem.
I want to write a method fetchAll
that queries an unspecified number of resources from remote servers in parallel. If any of the fetches fails, I want to return that first error immediately.
My initial implementation leaks goroutines:
package main import ( "fmt" "math/rand" "sync" "time" ) func fetchAll() error { wg := sync.WaitGroup{} errs := make(chan error) leaks := make(map[int]struct{}) defer fmt.Println("these goroutines leaked:", leaks) // run all the http requests in parallel for i := 0; i < 4; i++ { leaks[i] = struct{}{} wg.Add(1) go func(i int) { defer wg.Done() defer delete(leaks, i) // pretend this does an http request and returns an error time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) errs <- fmt.Errorf("goroutine %d's error returned", i) }(i) } // wait until all the fetches are done and close the error // channel so the loop below terminates go func() { wg.Wait() close(errs) }() // return the first error for err := range errs { if err != nil { return err } } return nil } func main() { fmt.Println(fetchAll()) }
Playground: https://play.golang.org/p/Be93J514R5
I know from reading https://blog.golang.org/pipelines that I can create a signal channel to cleanup the other threads. Alternatively, I could probably use context
to accomplish it. But it seems like such a simple use case should have a simpler solution that I'm missing.
Using a channel to return something is the only way you can guarantee to collect all return values from goroutines without overwriting them. There are multiple ways we can use the same return channels: We can use the return channel to indicate only the error. We can use it to return both the result and the error.
A goroutine is a lightweight thread managed by the Go runtime. go f(x, y, z) starts a new goroutine running f(x, y, z) The evaluation of f , x , y , and z happens in the current goroutine and the execution of f happens in the new goroutine.
The WaitGroup type of sync package, is used to wait for the program to finish all goroutines launched from the main function. It uses a counter that specifies the number of goroutines, and Wait blocks the execution of the program until the WaitGroup counter is zero.
Goroutine: A Goroutine is a function or method which executes independently and simultaneously in connection with any other Goroutines present in your program. Or in other words, every concurrently executing activity in Go language is known as a Goroutines.
Using Error Group makes this even simpler. This automatically waits for all the supplied Go Routines to complete successfully, or cancels all those remaining in the case of any one routine returning an error (in which case that error is the one bubble back up to the caller).
package main import ( "context" "fmt" "math/rand" "time" "golang.org/x/sync/errgroup" ) func fetchAll(ctx context.Context) error { errs, ctx := errgroup.WithContext(ctx) // run all the http requests in parallel for i := 0; i < 4; i++ { errs.Go(func() error { // pretend this does an http request and returns an error time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return fmt.Errorf("error in go routine, bailing") }) } // Wait for completion and return the first error (if any) return errs.Wait() } func main() { fmt.Println(fetchAll(context.Background())) }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With