Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Exiting forever loop using channels - issues with Go Playground

Tags:

I am trying to implement a simple logic where a Producer sends data to a channel ch with an forever for loop and a Consumer reads from the channel ch.

The Producer stops producing and exit the forever loop when it receives a signal on the channel quit.

The code is this (see also this playground)

func main() {
    ch := make(chan int)
    quit := make(chan bool)
    var wg sync.WaitGroup
    wg.Add(1)
    go produce(ch, quit, &wg)
    go consume(ch)
    time.Sleep(1 * time.Millisecond)
    fmt.Println("CLOSE")
    close(quit)
    wg.Wait()
}

func produce(ch chan int, quit chan bool, wg *sync.WaitGroup) {
    for i := 0; ; i++ {
        select {
        case <-quit:
            close(ch)
            fmt.Println("exit")
            wg.Done()
            return //we exit
        default:
            ch <- i
            fmt.Println("Producer sends", i)
        }
    }
}

func consume(ch chan int) {
    for {
        runtime.Gosched() // give the opportunity to the main goroutine to close the "quit" channel
        select {
        case i, more := <-ch:
            if !more {
                fmt.Println("exit consumer")
                return
            }
            fmt.Println("Consumer receives", i)
        }
    }
}

If I run this piece of code on my machine (a Mac with 4 cores) everything works fine. If I try the same code on the Go Playgroud it always times out. I guess that this because the Go Playground is a single core and so the infinite loop does not give the chance to other goroutines to run, but then I do not understand why the instruction runtime.Gosched() does not have any effect.

Just to complete the picture I have seen that, if I set GOMAXPROCS=1 on my Mac, the program still works fine and exits as expected. If I set GOMAXPROCS=1 on my Mac and remove the runtime.Gosched() instruction, the behavior gets brittle: sometimes the program terminates as expected, some other times it seems to never exit the infinite loop.

like image 414
Picci Avatar asked Sep 27 '20 13:09

Picci


1 Answers

You created a pathological situation that shouldn't happen in a real program, so the scheduler is not optimized to handle this. Combined with the fake time implementation in the playground, and you get far too many cycles of the producer and consumer before hitting a timeout.

The producer goroutine is creating values as fast as possible, while the consumer is always ready to receive them. With GOMAPXPROCS=1, the scheduler spends all its time bouncing between the two before it is forced to preempt the available work to check on the main goroutine, which takes longer than the playground will allow.

If we add something for the producer-consumer pair to do, we can limit the amount of time they have to monopolize the scheduler. For example, adding a time.Sleep(time.Microsecond) to the consumer will cause the playground to print 1000 values. This also goes to show how "accurate" the simulated time is in the playground, since that would not be possible with normal hardware which takes a non-zero amount time to process each message.

While an interesting case, this has little bearing on real programs.

A few notes, you can range over a channel to receive all values, you should always defer wg.Done at the start of the goroutine when possible, you can send values in the select case which allows you to actually cancel the for-select loop when the send isn't ready, and if you want the "exit consumer" message you need to send the WaitGroup to the consumer as well.

https://play.golang.org/p/WyPmpY9pFl7

func main() {
    ch := make(chan int)
    quit := make(chan bool)
    var wg sync.WaitGroup
    wg.Add(2)
    go produce(ch, quit, &wg)
    go consume(ch, &wg)
    time.Sleep(50 * time.Microsecond)
    fmt.Println("CLOSE")
    close(quit)
    wg.Wait()
}

func produce(ch chan int, quit chan bool, wg *sync.WaitGroup) {
    defer wg.Done()

    for i := 0; ; i++ {
        select {
        case <-quit:
            close(ch)
            fmt.Println("exit")
            return
        case ch <- i:
            fmt.Println("Producer sends", i)
        }
    }
}

func consume(ch chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for i := range ch {
        fmt.Println("Consumer receives", i)
        time.Sleep(time.Microsecond)
    }
    
    fmt.Println("exit consumer")
    return
}
like image 60
JimB Avatar answered Oct 13 '22 20:10

JimB