I'm working on picking up a few of concurrency patterns of Go. I looked at implementing background workers using goroutines and input/output channels, and noticed that when I sending new jobs to the receiving channel (essentially enqueuing new jobs) I have to do it in a goroutine or the scheduling gets messed up. Meaning:
for _, jobData := range(dataSet) {
input <- jobData
}
go func() {
for _, jobData := range(dataSet) {
input <- jobData
}
}()
For something more concrete, I played with some nonsense code (here it is in go playground):
package main
import (
"log"
"runtime"
)
func doWork(data int) (result int) {
// ... some 'heavy' computation
result = data * data
return
}
// do the processing of the input and return
// results on the output channel
func Worker(input, output chan int) {
for data := range input {
output <- doWork(data)
}
}
func ScheduleWorkers() {
input, output := make(chan int), make(chan int)
for i := 0 ; i < runtime.NumCPU() ; i++ {
go Worker(input, output)
}
numJobs := 20
// THIS DOESN'T WORK
// and crashes the program
/*
for i := 0 ; i < numJobs ; i++ {
input <- i
}
*/
// THIS DOES
go func() {
for i := 0 ; i < numJobs ; i++ {
input <- i
}
}()
results := []int{}
for i := 0 ; i < numJobs ; i++ {
// read off results
result := <-output
results = append(results, result)
// do stuff...
}
log.Printf("Result: %#v\n", results)
}
func main() {
ScheduleWorkers()
}
I'm trying to wrap my head around this subtle difference - help is appreciated. Thanks.
Your ScheduleWorks function sends, in the main goroutine (ie. the one that runs the main() function, in which the program starts), a value via input. A Worker receives it, and sends another value via output. But there is nobody receiving from output at that point, so the program can't go on, and the main goroutine sends the next value to another Worker.
Repeat this reasoning for each Worker. You have runtime.NumCPU() workers, that probably is less than numJobs. Let's say that runtime.NumCPU() == 4, so you have 4 workers. At the end, you have successfully sent 4 values, each one to one Worker. Since nobody is reading from output, all Workers are busy trying to send, so they can't accept more data via input, so the fifth input <- i will hang. At this point every goroutine is waiting; that's the deadlock.

You will notice that, if you launch 20 or more Workers instead of runtime.NumCPU(), the program works. That's because the main goroutine can send everything that it wants via input, since there are enough workers to receive them.
If, instead of all of this, you put the input <- i loop in another goroutine, as in your successful example, the main goroutine (in which ScheduleWorks runs) can go on and start reading from output. So, each time this new goroutine sends a value, the worker sends another via output, the main goroutine gets this output, and the worker can receive another value. Nobody waits, and the program succeeds.

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With