I'm implementing pipeline fan-in fan-out pattern. Buut I don't understand why the code is working like this, please explain me.
My first codes. All tasks run into first goroutine: https://go.dev/play/p/X6DqEuj86cZ
func main() {
var rdNums []int
for i := 0; i < 1000; i++ {
rdNums = append(rdNums, i)
}
pl := generatePipeline(rdNums)
c1 := fanOut(pl, "1")
c2 := fanOut(pl, "2")
c3 := fanOut(pl, "3")
c4 := fanOut(pl, "4")
c := fanIn(c1, c2, c3, c4)
sum := 0
for i := range c {
sum += i
}
fmt.Println(sum)
}
func generatePipeline(arrNums []int) <-chan int {
pl := make(chan int, 100)
go func() {
for _, n := range arrNums {
pl <- n
}
close(pl)
}()
return pl
}
func fanOut(in <-chan int, name string) <-chan int {
out := make(chan int)
go func() {
for v := range in {
fmt.Printf("Push square of %d to channel %s \n", v*v, name)
out <- v * v
}
close(out)
}()
return out
}
func fanIn(inputChan ...<-chan int) <-chan int {
in := make(chan int)
go func() {
for _, c := range inputChan {
for v := range c {
in <- v
}
}
close(in)
}()
return in
}
My second codes. The tasks are roughly evenly on each goroutine: https://go.dev/play/p/7vg8X6KgUrp
func main() {
var rdNums []int
for i := 0; i < 1000; i++ {
rdNums = append(rdNums, i)
}
pl := createQueue(rdNums)
for i := 0; i < 5; i++ {
go process(pl, fmt.Sprintf("worker%d", i))
}
time.Sleep(1 * time.Minute)
}
func createQueue(arrNums []int) <-chan int {
pl := make(chan int)
go func() {
for _, n := range arrNums {
pl <- n
}
close(pl)
}()
return pl
}
func process(in <-chan int, name string) {
count := 0
go func() {
for v := range in {
fmt.Printf("Push square of %d to channel %s \n", v*v, name)
count++
}
fmt.Printf("Process %s success, total number reiceive %d\n", name, count)
}()
}
I really wanna understand why there is this difference?? Tks.
Your first fan-in implementation is broken.
go func() {
for _, c := range inputChan {
for v := range c {
in <- v
}
}
The above goroutine will get the first inputChan, and it will read from it until that channel is closed, which means until the generator is done. There are four goroutines sending data to four channels, so while the first inputChan is being processed, all other will be waiting to send to their respective channels. Once the first inputChan is finished and closed, the remaining three goroutines can send their inputs, and the program terminates.
The problem can be fixed by moving the inner-loop where you are reading from the input channels into individual goroutines, one for each channel, and sending the data to a common output channel.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With