The idea is to have a variable number of channels in a slice, push each value received through them into a single channel, and close this output channel once the last one of the input channels is closed. Something like this, but for a number of channels more than two:
func multiplex(cin1, cin2, cout chan int) {
n := 2
for {
select {
case v, ok := <-cin1:
if ok {
cout <- v
} else {
n -= 1
}
case v, ok := <-cin2:
if ok {
cout <- v
} else {
n -= 1
}
}
if n == 0 {
close(cout)
break
}
}
}
The above code avoids busy looping since there is no default
case, which is good (EDIT: it looks like the presence of ", ok" makes the select statement non-blocking and the loop is busy after all. But for the sake of the example, think of the code as if it would block). Could the same kind of functionality also be achieved with an arbitrary number of input channels? Obviously, this could be done by reducing the slice pairwise to a single channel, but I would be more interested in a simpler solution, if possible.
I believe this snippet does what you're looking for. I've changed the signature so that it's clear that the inputs and output should only be used for communication in one direction. Note the addition of a sync.WaitGroup
, you need some way for all of the inputs to signal that they have completed, and this is pretty easy.
func combine(inputs []<-chan int, output chan<- int) {
var group sync.WaitGroup
for i := range inputs {
group.Add(1)
go func(input <-chan int) {
for val := range input {
output <- val
}
group.Done()
} (inputs[i])
}
go func() {
group.Wait()
close(output)
} ()
}
Edit: added pairwise reduction example code and reordered parts of the answer.
The preferred solution is the non-answer of "restructure so that you don't have a slice of channels." The restructuring can often make use of the feature that multiple goroutines can send to a single channel. So instead of having each of your sources send on separate channels and then having to deal with receiving from a bunch of channels, just create one channel and let all the sources send on that channel.
Go does not offer a feature to receive from a slice of channels. It's a frequently asked question, and while the solution just given is preferred, there are ways to program it. A solution I thought you were suggesting in your original question by saying "reducing the slice pairwise" is a binary divide and conquer solution. This works just fine, as long as you have a solution for multiplexing two channels into one. Your example code for this is very close to working.
You're just missing one little trick to make your example code work. Where you decrement n, add a line to set the channel variable to nil. For example, I made the code read
case v, ok := <-cin1:
if ok {
cout <- v
} else {
n--
cin1 = nil
}
case v, ok := <-cin2:
if ok {
cout <- v
} else {
n--
cin2 = nil
}
}
This solution does what you want and is not busy waiting.
So then, a full example incorporating this solution into a function that multiplexes a slice:
package main
import (
"fmt"
"time"
)
func multiplex(cin []chan int, cout chan int) {
var cin0, cin1 chan int
switch len(cin) {
case 2:
cin1 = cin[1]
fallthrough
case 1:
cin0 = cin[0]
case 0:
default:
cin0 = make(chan int)
cin1 = make(chan int)
half := len(cin) / 2
go multiplex(cin[:half], cin0)
go multiplex(cin[half:], cin1)
}
for cin0 != nil || cin1 != nil {
select {
case v, ok := <-cin0:
if ok {
cout <- v
} else {
cin0 = nil
}
case v, ok := <-cin1:
if ok {
cout <- v
} else {
cin1 = nil
}
}
}
close(cout)
}
func main() {
cin := []chan int{
make(chan int),
make(chan int),
make(chan int),
}
cout := make(chan int)
for i, c := range cin {
go func(x int, cx chan int) {
for i := 1; i <= 3; i++ {
time.Sleep(100 * time.Millisecond)
cx <- x*10 + i
}
close(cx)
}(i, c)
}
go multiplex(cin, cout)
for v := range cout {
fmt.Println("main gets", v)
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With