Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to broadcast message using channel

I am new to go and I am trying to create a simple chat server where clients can broadcast messages to all connected clients.

In my server, I have a goroutine (infinite for loop) that accepts connection and all the connections are received by a channel.

go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
        }
}()

Then, I start a handler (goroutine) for every connected client. Inside the handler, I try to broadcast to all connections by iterating through the channel.

for c := range ch {
    conn.Write(msg)
}

However, I cannot broadcast because (I think from reading the docs) the channel needs to be closed before iterating. I am not sure when I should close the channel because I want to continuously accept new connections and closing the channel won't let me do that. If anyone can help me, or provide a better way to broadcast messages to all connected clients, it would be appreciated.

like image 710
Jin Hoon Jeffrey Bang Avatar asked Apr 05 '16 04:04

Jin Hoon Jeffrey Bang


People also ask

What is broadcast channel in message?

Cell Broadcast is a technology that's part of GSM standard (Protocol for 2G cellular networks) and has been designed to deliver messages to multiple users in an area. The technology is also used to push location-based subscriber services or to communicate area code of Antenna cell using Channel 050.

What is broadcast channel API?

The Broadcast Channel API allows basic communication between browsing contexts (that is, windows, tabs, frames, or iframes) and workers on the same origin. Note: This feature is available in Web Workers. By creating a BroadcastChannel object, you can receive any messages that are posted to it.


2 Answers

What you are doing is a fan out pattern, that is to say, multiple endpoints are listening to a single input source. The result of this pattern is, only one of these listeners will be able to get the message whenever there's a message in the input source. The only exception is a close of channel. This close will be recognized by all of the listeners, and thus a "broadcast".

But what you want to do is broadcasting a message read from connection, so we could do something like this:

When the number of listeners is known

Let each worker listen to dedicated broadcast channel, and dispatch the message from the main channel to each dedicated broadcast channel.

type worker struct {
    source chan interface{}
    quit chan struct{}
}

func (w *worker) Start() {
    w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
    go func() {
        for {
            select {
            case msg := <-w.source
                // do something with msg
            case <-quit: // will explain this in the last section
                return
            }
        }
    }()
}

And then we could have a bunch of workers:

workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }

Then start our listener:

go func() {
for {
    conn, _ := listener.Accept()
    ch <- conn
    }
}()

And a dispatcher:

go func() {
    for {
        msg := <- ch
        for _, worker := workers {
            worker.source <- msg
        }
    }
}()

When the number of listeners is not known

In this case, the solution given above still works. The only difference is, whenever you need a new worker, you need to create a new worker, start it up, and then push it into workers slice. But this method requires a thread-safe slice, which need a lock around it. One of the implementation may look like as follows:

type threadSafeSlice struct {
    sync.Mutex
    workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
    slice.Lock()
    defer slice.Unlock()

    workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
    slice.Lock()
    defer slice.Unlock()

    for _, worker := range workers {
        routine(worker)
    }
}

Whenever you want to start a worker:

w := &worker{}
w.Start()
threadSafeSlice.Push(w)

And your dispatcher will be changed to:

go func() {
    for {
        msg := <- ch
        threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
    }
}()

Last words: never leave a dangling goroutine

One of the good practices is: never leave a dangling goroutine. So when you finished listening, you need to close all of the goroutines you fired. This will be done via quit channel in worker:

First we need to create a global quit signalling channel:

globalQuit := make(chan struct{})

And whenever we create a worker, we assign the globalQuit channel to it as its quit signal:

worker.quit = globalQuit

Then when we want to shutdown all workers, we simply do:

close(globalQuit)

Since close will be recognized by all listening goroutines (this is the point you understood), all goroutines will be returned. Remember to close your dispatcher routine as well, but I will leave it to you :)

like image 180
nevets Avatar answered Oct 19 '22 03:10

nevets


A more elegant solution is a "broker", where clients may subscribe and unsubscibe to messages.

To also handle subscribing and unsubscribing elegantly, we may utilize channels for this, so the main loop of the broker which receives and distributes the messages can incorporate all these using a single select statement, and synchronization is given from the solution's nature.

Another trick is to store the subscribers in a map, mapping from the channel we use to distribute messages to them. So use the channel as the key in the map, and then adding and removing the clients is "dead" simple. This is made possible because channel values are comparable, and their comparison is very efficient as channel values are simple pointers to channel descriptors.

Without further ado, here's a simple broker implementation:

type Broker struct {
    stopCh    chan struct{}
    publishCh chan interface{}
    subCh     chan chan interface{}
    unsubCh   chan chan interface{}
}

func NewBroker() *Broker {
    return &Broker{
        stopCh:    make(chan struct{}),
        publishCh: make(chan interface{}, 1),
        subCh:     make(chan chan interface{}, 1),
        unsubCh:   make(chan chan interface{}, 1),
    }
}

func (b *Broker) Start() {
    subs := map[chan interface{}]struct{}{}
    for {
        select {
        case <-b.stopCh:
            return
        case msgCh := <-b.subCh:
            subs[msgCh] = struct{}{}
        case msgCh := <-b.unsubCh:
            delete(subs, msgCh)
        case msg := <-b.publishCh:
            for msgCh := range subs {
                // msgCh is buffered, use non-blocking send to protect the broker:
                select {
                case msgCh <- msg:
                default:
                }
            }
        }
    }
}

func (b *Broker) Stop() {
    close(b.stopCh)
}

func (b *Broker) Subscribe() chan interface{} {
    msgCh := make(chan interface{}, 5)
    b.subCh <- msgCh
    return msgCh
}

func (b *Broker) Unsubscribe(msgCh chan interface{}) {
    b.unsubCh <- msgCh
}

func (b *Broker) Publish(msg interface{}) {
    b.publishCh <- msg
}

Example using it:

func main() {
    // Create and start a broker:
    b := NewBroker()
    go b.Start()

    // Create and subscribe 3 clients:
    clientFunc := func(id int) {
        msgCh := b.Subscribe()
        for {
            fmt.Printf("Client %d got message: %v\n", id, <-msgCh)
        }
    }
    for i := 0; i < 3; i++ {
        go clientFunc(i)
    }

    // Start publishing messages:
    go func() {
        for msgId := 0; ; msgId++ {
            b.Publish(fmt.Sprintf("msg#%d", msgId))
            time.Sleep(300 * time.Millisecond)
        }
    }()

    time.Sleep(time.Second)
}

Output of the above will be (try it on the Go Playground):

Client 2 got message: msg#0
Client 0 got message: msg#0
Client 1 got message: msg#0
Client 2 got message: msg#1
Client 0 got message: msg#1
Client 1 got message: msg#1
Client 1 got message: msg#2
Client 2 got message: msg#2
Client 0 got message: msg#2
Client 2 got message: msg#3
Client 0 got message: msg#3
Client 1 got message: msg#3

Improvements

You may consider the following improvements. These may or may not be useful depending on how / to what you use the broker.

Broker.Unsubscribe() may close the message channel, signalling that no more messages will be sent on it:

func (b *Broker) Unsubscribe(msgCh chan interface{}) {
    b.unsubCh <- msgCh
    close(msgCh)
}

This would allow clients to range over the message channel, like this:

msgCh := b.Subscribe()
for msg := range msgCh {
    fmt.Printf("Client %d got message: %v\n", id, msg)
}

Then if someone unsubscribes this msgCh like this:

b.Unsubscribe(msgCh)

The above range loop will terminate after processing all messages that were sent before the call to Unsubscribe().

If you want your clients to rely on the message channel being closed, and the broker's lifetime is narrower than your app's lifetime, then you could also close all subscribed clients when the broker is stopped, in the Start() method like this:

case <-b.stopCh:
    for msgCh := range subs {
        close(msgCh)
    }
    return
like image 65
icza Avatar answered Oct 19 '22 03:10

icza