Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Go: One producer many consumers

So I have seen a lot of ways of implementing one consumer and many producers in Go - the classic fanIn function from the Concurrency in Go talk.

What I want is a fanOut function. It takes as a parameter a channel it reads a value from and returns a slice of channels that it writes copies of this value to.

Is there a correct/recommended way of implementing this?

like image 690
Carl Avatar asked Jun 05 '13 01:06

Carl


2 Answers

You pretty much described the best way to do it but here is a small sample of code that does it.

Go playground: https://play.golang.org/p/jwdtDXVHJk

package main

import (
    "fmt"
    "time"
)

func producer(iters int) <-chan int {
    c := make(chan int)
    go func() {
        for i := 0; i < iters; i++ {
            c <- i
            time.Sleep(1 * time.Second)
        }
        close(c)
    }()
    return c
}

func consumer(cin <-chan int) {
    for i := range cin {
        fmt.Println(i)
    }
}

func fanOut(ch <-chan int, size, lag int) []chan int {
    cs := make([]chan int, size)
    for i, _ := range cs {
        // The size of the channels buffer controls how far behind the recievers
        // of the fanOut channels can lag the other channels.
        cs[i] = make(chan int, lag)
    }
    go func() {
        for i := range ch {
            for _, c := range cs {
                c <- i
            }
        }
        for _, c := range cs {
            // close all our fanOut channels when the input channel is exhausted.
            close(c)
        }
    }()
    return cs
}

func fanOutUnbuffered(ch <-chan int, size int) []chan int {
    cs := make([]chan int, size)
    for i, _ := range cs {
        // The size of the channels buffer controls how far behind the recievers
        // of the fanOut channels can lag the other channels.
        cs[i] = make(chan int)
    }
    go func() {
        for i := range ch {
            for _, c := range cs {
                c <- i
            }
        }
        for _, c := range cs {
            // close all our fanOut channels when the input channel is exhausted.
            close(c)
        }
    }()
    return cs
}

func main() {
    c := producer(10)
    chans := fanOutUnbuffered(c, 3)
    go consumer(chans[0])
    go consumer(chans[1])
    consumer(chans[2])
}

The important part to note is how we close the output channels once the input channel has been exhausted. Also if one of the output channels blocks on the send it will hold up the send on the other output channels. We control the amount of lag by setting the buffer size of the channels.

like image 161
Jeremy Wall Avatar answered Nov 20 '22 18:11

Jeremy Wall


This solution below is a bit contrived, but it works for me:

package main

import (
    "fmt"
    "time"
    "crypto/rand"
    "encoding/binary"
)

func handleNewChannels(arrchangen chan [](chan uint32),
                       intchangen chan (chan uint32)) {
    currarr := []chan uint32{}
    arrchangen <- currarr
    for {
        newchan := <-intchangen
        currarr = append(currarr, newchan)
        arrchangen <- currarr
    }
}

func sendToChannels(arrchangen chan [](chan uint32)) {
    tick := time.Tick(1 * time.Second)
    currarr := <-arrchangen
    for {
        select {
        case <-tick:
            sent := false
            var n uint32
            binary.Read(rand.Reader, binary.LittleEndian, &n)
            for i := 0 ; i < len(currarr) ; i++ {
                currarr[i] <- n
                sent = true
            }
            if sent {
                fmt.Println("Sent generated ", n)
            }
        case newarr := <-arrchangen:
            currarr = newarr
        }
    }
}
func handleChannel(tchan chan uint32) {
    for {
        val := <-tchan
        fmt.Println("Got the value ", val)
    }
}

func createChannels(intchangen chan (chan uint32)) {
    othertick := time.Tick(5 * time.Second)
    for {
        <-othertick
        fmt.Println("Creating new channel! ")
        newchan := make(chan uint32)
        intchangen <- newchan
        go handleChannel(newchan)
    }
}

func main() {
    arrchangen := make(chan [](chan uint32))
    intchangen := make(chan (chan uint32))
    go handleNewChannels(arrchangen, intchangen)
    go sendToChannels(arrchangen)
    createChannels(intchangen)
}
like image 2
João Marcus Avatar answered Nov 20 '22 17:11

João Marcus