I have several channels c1, c2, c3, c4 ..., How can I collect all data from those channel into one channel? my code:
package main
import (
"fmt"
"sync"
)
func putToChannel(c chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i < 6; i++ {
c <- i
}
}
func main() {
c := make(chan int, 15)
c1 := make(chan int, 5)
c2 := make(chan int, 5)
c3 := make(chan int, 5)
go func(){c <- <-c1}()
go func(){c <- <-c2}()
go func(){c <- <-c3}()
wg := new(sync.WaitGroup)
wg.Add(3)
go putToChannel(c1, wg)
go putToChannel(c2, wg)
go putToChannel(c3, wg)
wg.Wait()
close(c)
for i := range c {
fmt.Println("Receive:", i)
}
fmt.Println("Finish")
}
I want to compose all the data from c1, c2 ... to c but it not works
This article has a good writeup of how to do "Fan-In" of channels, including stopping short.
There's a problem with these lines:
go func(){c <- <-c1}()
go func(){c <- <-c2}()
go func(){c <- <-c3}()
Each of these will receive one value from cx
channel and send that one value into c
.
You need a method that looks like this;
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
This method relies on the fact that the channels cs...
that are being passed to merge
are closed when there are not more values to be sent.
This means you will need to update your putToChannel
method also
func putToChannel(c chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
defer close(c)
for i := 1; i < 6; i++ {
c <- i
}
}
One final thing that it's worth noting is that in general; try and encapsulate the function that creates & send to a channel and the function that closes a channel to be the same function. This means you will never try and send on a closed channel.
Instead of:
c1 := make(chan int, 5)
go putToChannel(c1, wg)
You can do;
func generator() (<-chan int) {
c := make(chan int, 5)
go func() {
for i := 1; i < 6; i++ {
c <- i
}
close(c)
}()
return c
}
Your main method will look something like:
func main() {
var cs []<-chan int
cs = append(cs, generator())
cs = append(cs, generator())
cs = append(cs, generator())
c := merge(cs...)
for v := range c {
fmt.Println(v)
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With