Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run multiple goroutines and collect results in the same order it runs

Tags:

go

I have the following code which has a double-go routine structure:

package main

import(
    "fmt"
    "math/rand"
    "time"
    "strconv"
)

func main(){
    outchan := make(chan string)
    for i:=0;i<10;i++{
        go testfun(i, outchan)
    }
    for i:=0;i<10;i++{
        a := <-outchan
        fmt.Println(a)
    }
}

func testfun(i int, outchan chan<- string){
    outchan2 := make(chan int)
    time.Sleep(time.Millisecond*time.Duration(int64(rand.Intn(10))))
    for j:=0;j<10;j++ {
        go testfun2(j, outchan2)
    }
    tempStr := strconv.FormatInt(int64(i),10)+" - "
    for j:=0;j<10;j++ {
        tempStr = tempStr + strconv.FormatInt(int64(<-outchan2),10)
    }
    outchan <- tempStr
}

func testfun2(j int, outchan2 chan<- int){
    time.Sleep(time.Millisecond*time.Duration(int64(rand.Intn(10))))
    outchan2 <- j
}

The output I was expecting is

0 - 0123456789
1 - 0123456789
2 - 0123456789
3 - 0123456789
4 - 0123456789
5 - 0123456789
6 - 0123456789
7 - 0123456789
8 - 0123456789
9 - 0123456789

But instead I got this:

7 - 7980345261
6 - 4035897621
3 - 9047526831
9 - 4032861975
8 - 9570831624
5 - 3798021546
1 - 0985362471
0 - 1849276035
2 - 9572806143
4 - 5768032419

Could anyone show me how to achieve the output I was expecting? I'm a newbie and please forgive me if the solution is obvious. I've been looking for it for days.

like image 616
Kyle Avatar asked Jan 07 '23 11:01

Kyle


2 Answers

To give you a better idea. The issue is that you're reading a single channel where the values that are pushed onto the channel are in an arbitrary order due to your time.Sleep calls. If you want to issue the time.Sleep calls concurrently to simulate concurrent long-running processes, what you'll want to do is make each goroutine write to a channel with the results.

This way you can iterate across an in-order list of the results channels blocking until the next channel can be read from (the same idea as the output queue in this answer Maintaining Order in a Multi-Threaded Pipeline) Here's your reworked code with some name changes to make things easier to track:

package main

import(
    "fmt"
    "math/rand"
    "time"
    "strconv"
)

func main(){
    var jobs []chan string
    for i := 0; i<10; i++{
        job := make(chan string)
        jobs = append(jobs, job)
        go testfun(i, job)
    }
    for _, result := range jobs {
      fmt.Println(<-result)
    }
}

func testfun(i int, job chan<- string){
    var innerJobs []chan int
    time.Sleep(time.Millisecond*time.Duration(int64(rand.Intn(10))))
    for j := 0; j<10; j++ {
        innerJob := make(chan int)
        innerJobs = append(innerJobs, innerJob)
        go testfun2(j, innerJob)
    }
    tempStr := strconv.FormatInt(int64(i),10)+" - "
    for _, result := range innerJobs {
      tempStr = tempStr + strconv.FormatInt(int64(<-result),10)
    }
    job <- tempStr
}

func testfun2(j int, innerJob chan<- int){
    time.Sleep(time.Millisecond*time.Duration(int64(rand.Intn(10))))
    innerJob <- j
}
like image 182
photoionized Avatar answered Jan 16 '23 19:01

photoionized


A different / more efficient approach is using a slice (or an array) and using a sync.WaitGroup:

func main() {
    var wg sync.WaitGroup
    out := make([]string, 10)
    for i := 0; i < len(out); i++ {
        wg.Add(1)
        go testfun(i, &out[i], &wg)
    }
    wg.Wait()
    for i := 0; i < len(out); i++ {
        a := out[i]
        fmt.Println(a)
    }
}

func testfun(i int, outVal *string, wg *sync.WaitGroup) {
    //........
    *outVal = tempStr
    wg.Done()
}

playground

edit: updated the example for testfun2 as well, forgot about that.

like image 26
OneOfOne Avatar answered Jan 16 '23 19:01

OneOfOne