Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Go: one channel with multiple listeners

Tags:

go

I'm pretty new to Go so sorry if the topic is wrong but I hope you understand my question. I want to process events to different go routines via a channel. Here is some sample code

type Event struct {
    Host string
    Command string
    Output string
}


var (
    incoming        = make(chan Event)
)

func processEmail(ticker* time.Ticker) {
    for {
        select {
        case t := <-ticker.C:
            fmt.Println("Email Tick at", t)
        case e := <-incoming:
            fmt.Println("EMAIL GOT AN EVENT!")
            fmt.Println(e)
        }
    }
}

func processPagerDuty(ticker* time.Ticker) {
    for {
        select {
        case t := <-ticker.C:
            fmt.Println("Pagerduty Tick at", t)
        case e := <-incoming:
            fmt.Println("PAGERDUTY GOT AN EVENT!")
            fmt.Println(e)
        }
    }
}

func main() {

    err := gcfg.ReadFileInto(&cfg, "dispatch-api.cfg")
    if err != nil {
        fmt.Printf("Error loading the config")
    }

    ticker := time.NewTicker(time.Second * 10)
    go processEmail(ticker)

    ticker := time.NewTicker(time.Second * 1)
    go processPagerDuty(ticker)
}


func eventAdd(r render.Render, params martini.Params, req *http.Request) {

    // create an event now
    e := Event{Host: "web01-east.domain.com", Command: "foo", Output: "bar"}
    incoming <- e
}

So the ticker events work just create. When I issue an API call to create an event I just get output from the processEmail function. Its whatever go routine is called first will get the event over the channel.

Is there a way for both functions to get that event?

like image 582
Mike Avatar asked Feb 15 '15 14:02

Mike


People also ask

Can multiple Goroutines write to same channel?

“A single channel may be used in send statements, receive operations, and calls to the built-in functions cap and len by any number of goroutines without further synchronization.” In other words, you can have multiple writers and multiple readers all using a single channel without a mutex or other lock.

What is buffered channel in Golang?

Buffered channels allows to accept a limited number of values without a corresponding receiver for those values. It is possible to create a channel with a buffe. Buffered channel are blocked only when the buffer is full. Similarly receiving from a buffered channel are blocked only when the buffer will be empty.

What is select Golang?

The select statement lets a goroutine wait on multiple communication operations. A select blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready.


2 Answers

Channels are a point to point communication method, not a broadcast communication method, so no, you can't get both functions to get the event without doing something special.

You could have separate channels for both goroutines and send the message into each. This is probably the simplest solution.

Or alternatively you could get one goroutine to signal the next one.

Go has two mechanisms for doing broadcast signalling as far as I know. One is closing a channel. This only works a single time though.

The other is to use a sync.Cond lock. These are moderately tricky to use, but will allow you to have multiple goroutines woken up by a single event.

If I was you, I'd go for the first option, send the event to two different channels. That seems to map the problem quite well.

like image 163
Nick Craig-Wood Avatar answered Oct 05 '22 00:10

Nick Craig-Wood


You can use fan in and fan out (from Rob Pike's speech):

package main

func main() {
    // feeders - feeder1, feeder2 and feeder3 are used to fan in
    // data into one channel
    go func() {
        for {
            select {
            case v1 := <-feeder1:
                mainChannel <- v1
            case v2 := <-feeder2:
                mainChannel <- v2
            case v3 := <-feeder3:
                mainChannel <- v3
            }
        }
    }()

    // dispatchers - not actually fan out rather dispatching data
    go func() {
        for {
            v := <-mainChannel

            // use this to prevent leaking goroutines
            // (i.e. when one consumer got stuck)
            done := make(chan bool)

            go func() {
                consumer1 <- v
                done <- true
            }()
            go func() {
                consumer2 <- v
                done <- true
            }()
            go func() {
                consumer3 <- v
                done <- true
            }()

            <-done
            <-done
            <-done
        }
    }()

    // or fan out (when processing the data by just one consumer is enough)
    go func() {
        for {
            v := <-mainChannel
            select {
            case consumer1 <- v:
            case consumer2 <- v:
            case consumer3 <- v:
            }
        }
    }()

    // consumers(your logic)
    go func() { <-consumer1 /* using the value */ }()
    go func() { <-consumer2 /* using the value */ }()
    go func() { <-consumer3 /* using the value */ }()
}

type payload int

var (
    feeder1 = make(chan payload)
    feeder2 = make(chan payload)
    feeder3 = make(chan payload)

    mainChannel = make(chan payload)

    consumer1 = make(chan payload)
    consumer2 = make(chan payload)
    consumer3 = make(chan payload)
)
like image 43
Kaveh Shahbazian Avatar answered Oct 04 '22 23:10

Kaveh Shahbazian