Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Need help understanding why select{} isn't blocking forever

Tags:

go

channel

I am working on an exercise in using channels to implement a queue. Specifically, I am trying to use the size of a channel to limit the number of simultaneous goroutines. To wit, I have written the following code:

package main

import "fmt"
import "time"
import "math/rand"

func runTask (t string, ch *chan bool) {
        start := time.Now()
        fmt.Println("starting task", t)
        time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time
        fmt.Println("done running task", t, "in", time.Since(start))
        <- *ch
}

func main() {
        numWorkers := 3
        files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}

        activeWorkers := make(chan bool, numWorkers)

        for _, f := range files {
                activeWorkers <- true
                fmt.Printf("activeWorkers is %d long.\n", len(activeWorkers))
                go runTask(f, &activeWorkers)
        }
        select{}
}

Right now, the code crashes with:

throw: all goroutines are asleep - deadlock!

My expectation was that the call to select would block forever and let the goroutines terminate without a deadlock.

So I have a two-fold question: why isn't select blocking forever and, short of throwing in a time.Sleep() call after the for loop, how can I avoid deadlocks?

Cheers,

-mtw

like image 926
mtw Avatar asked Apr 16 '12 09:04

mtw


2 Answers

Arlen Cuss has already written a good answer. I just want to suggest another design for your work queue. Instead of limiting the number of entries your channel can buffer, you can also just spawn a limited number of worker goroutines which feels more natural imho. Something like that:

package main

import "fmt"
import "time"
import "math/rand"

func runTask(t string) string {
    start := time.Now()
    fmt.Println("starting task", t)
    time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time
    fmt.Println("done running task", t, "in", time.Since(start))
    return t
}

func worker(in chan string, out chan string) {
    for t := range in {
        out <- runTask(t)
    }
}

func main() {
    numWorkers := 3

    // spawn workers
    in, out := make(chan string), make(chan string)
    for i := 0; i < numWorkers; i++ {
        go worker(in, out)
    }

    files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}

    // schedule tasks
    go func() {
        for _, f := range files {
            in <- f
        }
    }()

    // get results
    for _ = range files {
        <-out
    }
}

You can also use a sync.WaitGroup if you just want to wait until all tasks have been executed, but using an out channel has the advantage that you can aggregate the results later. For example if each tasks returns the number of words in that file, the final loop might be used to sum up all individual word counts.

like image 193
tux21b Avatar answered Nov 05 '22 09:11

tux21b


Firstly, you don't need to pass a pointer to the channel; channels, like maps and others, are references, meaning the underlying data isn't copied, only a pointer to the actual data. If you need a pointer to a chan itself, you'll know when that time comes.

The crash occurs because the program gets into a state where every goroutine is blocked. This should be impossible; if every goroutine is blocked, then no possible process could come and wake up another goroutine (and your program would consequently be hung).

The primary goroutine winds up in a select {}—not waiting for anyone, just hanging. Once the last runTask goroutine finishes, there's only the primary goroutine left, and it's waiting on no-one.

You'll need to add some way to know when every goroutine has finished; perhaps another channel can receive finish events.

This is a bit ugly, but might be some inspiration.

package main

import "fmt"
import "time"
import "math/rand"

func runTask(t string, ch chan bool, finishedCh chan bool) {
    start := time.Now()
    fmt.Println("starting task", t)
    time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time
    fmt.Println("done running task", t, "in", time.Since(start))
    <-ch
    finishedCh <- true
}

func main() {
    numWorkers := 3
    files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}

    activeWorkers := make(chan bool, numWorkers)
    finishedWorkers := make(chan bool)
    done := make(chan bool)

    go func() {
        remaining := len(files)
        for remaining > 0 {
            <-finishedWorkers
            remaining -= 1
        }

        done <- true
    }()

    for _, f := range files {
        activeWorkers <- true
        fmt.Printf("activeWorkers is %d long.\n", len(activeWorkers))
        go runTask(f, activeWorkers, finishedWorkers)
    }

    <-done
}
like image 43
Asherah Avatar answered Nov 05 '22 07:11

Asherah