Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write a better two channel select

Tags:

go

goroutine

In the following code there are two channels A & B that contain work, in the real code they are different structures, the workers need to drain both channels before quitting. The workers need the information coming in from both channels. The two select statements work but it's very clumsy. If I add default: to make them non-blocking then the code fails to drain the channels. Is there a better way of writing the selects?

Right now if channel A has no work then channel B does not get serviced either. Another problem to solve, but not my main concern.

playground for testing following code:

package main

import (
    "fmt"
    "time"
)

const (
    fillCount  = 10 // number of elements in each input channel
    numWorkers = 3  // number of consumers.
)

func Wait() {
    time.Sleep(2000 * time.Millisecond)
}

func fillChannel(work chan string, name string) {
    for i := 0; i < fillCount; i++ {
        work <- fmt.Sprintf("%s%d", name, i)
    }
    close(work) // we're finished
}

func doWork(id int, ch1 chan string, ch2 chan string, done chan bool) {
    fmt.Println("Running worker", id)
    defer fmt.Println("Ending worker", id)

    for ch1Open, ch2Open := true, true; ch1Open && ch2Open; {
        cnt1 := len(ch1)
        cnt2 := len(ch2)

        if ch1Open {
            select {
            case str, more := <-ch1:
                if more {
                    fmt.Printf("%d: ch1(%d) %s\n", id, cnt1, str)
                } else {
                    fmt.Printf("%d: ch1 closed\n", id)
                    ch1Open = false
                }
            }
        }

        if ch2Open {
            select {
            case str, more := <-ch2:
                if more {
                    fmt.Printf("%d: ch2(%d) %s\n", id, cnt2, str)
                } else {
                    fmt.Printf("%d: ch2 closed\n", id)
                    ch2Open = false
                }
            }
        }
    }
    done <- true
}

func main() {

    a := make(chan string, 2) // a small channel
    b := make(chan string, 5) // a bigger channel

    // generate work
    go fillChannel(a, "A")
    go fillChannel(b, "B")

    // launch the consumers
    done := make(chan bool)

    for i := 0; i < numWorkers; i++ {
        go doWork(i, a, b, done)
    }

    // wait for the goroutines to finish.
    for i := 0; i < numWorkers; i++ {
        <-done
    }
    fmt.Println("All workers done.")

    Wait() // without this the defered prints from the workers doesn't flush
}
like image 748
AndrewN Avatar asked Feb 12 '23 00:02

AndrewN


1 Answers

Select on both channels in a loop. When a channel is closed, set the channel variable to nil to make receive on that channel not ready. Break out of the loop when both channels are nil.

http://play.golang.org/p/9gRY1yKqJ9

package main

import (
    "fmt"
    "time"
)

const (
    fillCount  = 10 // number of elements in each input channel
    numWorkers = 3  // number of consumers.
)

func fillChannel(work chan string, name string) {
    for i := 0; i < fillCount; i++ {
        work <- fmt.Sprintf("%s%d", name, i)
    }
    close(work) // we're finished
}

func doWork(id int, ch1 chan string, ch2 chan string, done chan bool) {
    fmt.Println("Running worker", id)
    for ch1 != nil || ch2 != nil {
        select {
        case str, ok := <-ch1:
            if ok {
                fmt.Printf("%d: ch1(%d) %s\n", id, len(ch1), str)
            } else {
                ch1 = nil
                fmt.Printf("%d: ch1 closed\n", id)
            }

        case str, ok := <-ch2:
            if ok {
                fmt.Printf("%d: ch2(%d) %s\n", id, len(ch2), str)
            } else {
                ch2 = nil
                fmt.Printf("%d: ch2 closed\n", id)
            }

        }
    }
    fmt.Println("Ending worker", id)
    done <- true
}

func main() {

    a := make(chan string, 2) // a small channel
    b := make(chan string, 5) // a bigger channel

    // generate work
    go fillChannel(a, "A")
    go fillChannel(b, "B")

    // launch the consumers
    done := make(chan bool)

    for i := 0; i < numWorkers; i++ {
        go doWork(i, a, b, done)
    }

    // wait for the goroutines to finish.
    for i := 0; i < numWorkers; i++ {
        <-done
    }
    fmt.Println("All workers done.")
}
like image 191
Simon Fox Avatar answered Mar 04 '23 20:03

Simon Fox