Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Should one drain a buffered channel when closing it

Tags:

go

channel

Given a (partially) filled buffered channel in Go

ch := make(chan *MassiveStruct, n)
for i := 0; i < n; i++ {
    ch <- NewMassiveStruct()
}

is it advisable to also drain the channel when closing it (by the writer) in case it is unknown when readers are going read from it (e.g. there is a limited number of those and they are currently busy)? That is

close(ch)
for range ch {}

Is such a loop guaranteed to end if there are other concurrent readers on the channel?

Context: a queue service with a fixed number of workers, which should drop processing anything queued when the service is going down (but not necessarily being GCed right after). So I am closing to indicate to the workers that the service is being terminated. I could drain the remaining "queue" immediately letting the GC free the resources allocated, I could read and ignore the values in the workers and I could leave the channel as is running down the readers and setting the channel to nil in the writer so that the GC cleans up everything. I am not sure which is the cleanest way.

like image 250
Oleg Sklyar Avatar asked Mar 06 '16 13:03

Oleg Sklyar


3 Answers

It depends on your program, but generally speaking I would tend to say no (you don't need to clear the channel before closing it): if there is items in your channel when you close it, any reader still reading from the channel will receive the items until the channel is emtpy.

Here is an example:

package main

import (
    "sync"
    "time"
)

func main() {

    var ch = make(chan int, 5)
    var wg sync.WaitGroup
    wg.Add(1)

    for range make([]struct{}, 2) {
        go func() {
            for i := range ch {
                wg.Wait()
                println(i)
            }
        }()
    }

    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)

    wg.Done()
    time.Sleep(1 * time.Second)
}

Here, the program will output all the items, despite the fact that the channel is closed strictly before any reader can even read from the channel.

like image 181
Elwinar Avatar answered Sep 26 '22 18:09

Elwinar


There are better ways to achieve what you're trying to achieve. Your current approach can just lead to throwing away some records, and processing other records randomly (since the draining loop is racing all the consumers). That doesn't really address the goal.

What you want is cancellation. Here's an example from Go Concurrency Patterns: Pipelines and cancellation

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

You pass a done channel to all the goroutines, and you close it when you want them all to stop processing. If you do this a lot, you may find the golang.org/x/net/context package useful, which formalizes this pattern, and adds some extra features (like timeout).

like image 45
Rob Napier Avatar answered Sep 22 '22 18:09

Rob Napier


I feel that the supplied answers actually do not clarify much apart from the hints that neither drain nor closing is needed. As such the following solution for the described context looks clean to me that terminates the workers and removes all references to them or the channel in question, thus, letting the GC to clean up the channel and its content:

type worker struct {
    submitted chan Task
    stop      chan bool
    p         *Processor
}

// executed in a goroutine
func (w *worker) run() {
    for {
        select {
        case task := <-w.submitted:
            if err := task.Execute(w.p); err != nil {
                logger.Error(err.Error())
            }
        case <-w.stop:
            logger.Warn("Worker stopped")
            return
        }
    }
}

func (p *Processor) Stop() {
    if atomic.CompareAndSwapInt32(&p.status, running, stopped) {
        for _, w := range p.workers {
            w.stop <- true
        }
        // GC all workers as soon as goroutines stop
        p.workers = nil
        // GC all published data when workers terminate
        p.submitted = nil
        // no need to do the following above:
        // close(p.submitted)
        // for range p.submitted {}
    }
}
like image 43
Oleg Sklyar Avatar answered Sep 26 '22 18:09

Oleg Sklyar