Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fan-in channels to single channel

Tags:

go

channel

I have several channels c1, c2, c3, c4 ..., How can I collect all data from those channel into one channel? my code:

package main

import (
    "fmt"
    "sync"
)

func putToChannel(c chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i < 6; i++ {
        c <- i
    }
}

func main() {
    c := make(chan int, 15)
    c1 := make(chan int, 5)
    c2 := make(chan int, 5)
    c3 := make(chan int, 5)
    go func(){c <- <-c1}()
    go func(){c <- <-c2}()
    go func(){c <- <-c3}()
    wg := new(sync.WaitGroup)
    wg.Add(3)
    go putToChannel(c1, wg)
    go putToChannel(c2, wg)
    go putToChannel(c3, wg)
    wg.Wait()
    close(c)
    for i := range c {
        fmt.Println("Receive:", i)
    }

    fmt.Println("Finish")
}

I want to compose all the data from c1, c2 ... to c but it not works

like image 260
Hau Ma Avatar asked Jul 09 '18 05:07

Hau Ma


1 Answers

This article has a good writeup of how to do "Fan-In" of channels, including stopping short.

There's a problem with these lines:

go func(){c <- <-c1}()
go func(){c <- <-c2}()
go func(){c <- <-c3}()

Each of these will receive one value from cx channel and send that one value into c.

You need a method that looks like this;

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

This method relies on the fact that the channels cs... that are being passed to merge are closed when there are not more values to be sent.

This means you will need to update your putToChannel method also

func putToChannel(c chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    defer close(c)
    for i := 1; i < 6; i++ {
        c <- i
    }
}

One final thing that it's worth noting is that in general; try and encapsulate the function that creates & send to a channel and the function that closes a channel to be the same function. This means you will never try and send on a closed channel.

Instead of:

c1 := make(chan int, 5)
go putToChannel(c1, wg)

You can do;

func generator() (<-chan int) {
    c := make(chan int, 5)
    go func() {
        for i := 1; i < 6; i++ {
             c <- i
        }
        close(c)
    }() 
    return c
}

Your main method will look something like:

func main() {
    var cs []<-chan int

    cs = append(cs, generator())
    cs = append(cs, generator())
    cs = append(cs, generator())

    c := merge(cs...)
    for v := range c {
        fmt.Println(v)
    }
}
like image 193
Zak Avatar answered Nov 15 '22 19:11

Zak