Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why all tasks run in first goroutine?

I'm implementing pipeline fan-in fan-out pattern. Buut I don't understand why the code is working like this, please explain me.

My first codes. All tasks run into first goroutine: https://go.dev/play/p/X6DqEuj86cZ

func main() {
    var rdNums []int
    for i := 0; i < 1000; i++ {
        rdNums = append(rdNums, i)
    }

    pl := generatePipeline(rdNums)
    c1 := fanOut(pl, "1")
    c2 := fanOut(pl, "2")
    c3 := fanOut(pl, "3")
    c4 := fanOut(pl, "4")

    c := fanIn(c1, c2, c3, c4)
    sum := 0
    for i := range c {
        sum += i
    }

    fmt.Println(sum)
}

func generatePipeline(arrNums []int) <-chan int {
    pl := make(chan int, 100)
    go func() {
        for _, n := range arrNums {
            pl <- n
        }

        close(pl)
    }()

    return pl
}

func fanOut(in <-chan int, name string) <-chan int {
    out := make(chan int)
    go func() {
        for v := range in {
            fmt.Printf("Push square of %d to channel %s \n", v*v, name)
            out <- v * v

        }

        close(out)
    }()

    return out
}

func fanIn(inputChan ...<-chan int) <-chan int {
    in := make(chan int)

    go func() {
        for _, c := range inputChan {
            for v := range c {
                in <- v
            }
        }

        close(in)
    }()

    return in
}

My second codes. The tasks are roughly evenly on each goroutine: https://go.dev/play/p/7vg8X6KgUrp

func main() {
    var rdNums []int
    for i := 0; i < 1000; i++ {
        rdNums = append(rdNums, i)
    }

    pl := createQueue(rdNums)
    for i := 0; i < 5; i++ {
        go process(pl, fmt.Sprintf("worker%d", i))
    }

    time.Sleep(1 * time.Minute)
}

func createQueue(arrNums []int) <-chan int {
    pl := make(chan int)
    go func() {
        for _, n := range arrNums {
            pl <- n
        }

        close(pl)
    }()

    return pl
}

func process(in <-chan int, name string) {
    count := 0
    go func() {
        for v := range in {
            fmt.Printf("Push square of %d to channel %s \n", v*v, name)
            count++

        }

        fmt.Printf("Process %s success, total number reiceive %d\n", name, count)
    }()
}

I really wanna understand why there is this difference?? Tks.

like image 367
Thang Ha Avatar asked Apr 08 '26 16:04

Thang Ha


1 Answers

Your first fan-in implementation is broken.

    go func() {
        for _, c := range inputChan {
            for v := range c {
                in <- v
            }
        }

The above goroutine will get the first inputChan, and it will read from it until that channel is closed, which means until the generator is done. There are four goroutines sending data to four channels, so while the first inputChan is being processed, all other will be waiting to send to their respective channels. Once the first inputChan is finished and closed, the remaining three goroutines can send their inputs, and the program terminates.

The problem can be fixed by moving the inner-loop where you are reading from the input channels into individual goroutines, one for each channel, and sending the data to a common output channel.

like image 88
Burak Serdar Avatar answered Apr 12 '26 13:04

Burak Serdar



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!