Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the neatest idiom for producer/consumer in Go?

Tags:

concurrency

go

What I would like to do is have a set of producer goroutines (of which some may or may not complete) and a consumer routine. The issue is with that caveat in parentheses - we don't know the total number that will return an answer.

So what I want to do is this:

package main

import (
  "fmt"
  "math/rand"
)

func producer(c chan int) {
  // May or may not produce.
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
}

func main() {
  c := make(chan int, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // If we include a close, then that's WRONG. Chan will be closed
  // but a producer will try to write to it. Runtime error.
  close(c)

  // If we don't close, then that's WRONG. All goroutines will
  // deadlock, since the range keyword will look for a close.
  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}

So the issue is, if I close it's wrong, if I don't close - it's still wrong (see comments in code).

Now, the solution would be an out-of-band signal channel, that ALL producers write to:

package main

import (
  "fmt"
  "math/rand"
)

func producer(c chan int, signal chan bool) {
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
  signal <- true
}

func main() {
  c := make(chan int, 10)
  signal := make(chan bool, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // This is basically a 'join'.
  num_done := 0
  for num_done < 10 {
    <- signal
    num_done++
  }
  close(c)

  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}

And that totally does what I want! But to me it seems like a mouthful. My question is: Is there any idiom/trick that lets me do something similar in an easier way?

I had a look here: http://golang.org/doc/codewalk/sharemem/ And it seems like the complete chan (initialised at the start of main) is used in a range but never closed. I do not understand how.

If anyone has any insights, I would greatly appreciate it. Cheers!


Edit: fls0815 has the answer, and has also answered the question of how the close-less channel range works.

My code above modifed to work (done before fls0815 kindly supplied code):

package main

import (
  "fmt"
  "math/rand"
  "sync"
)

var wg_prod sync.WaitGroup
var wg_cons sync.WaitGroup

func producer(c chan int) {
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
  wg_prod.Done()
}

func main() {
  c := make(chan int, 10)
  wg_prod.Add(10)
  for i := 0; i < 10; i++ {
    go producer(c)
  }

  wg_cons.Add(1)
  go func() {
    for num := range c {
      fmt.Printf("Producer produced: %d\n", num)
    }
    wg_cons.Done()
  } ()

  wg_prod.Wait()
  close(c)
  wg_cons.Wait()
  fmt.Println("All done.")
}
like image 979
Will Avatar asked Jun 18 '12 00:06

Will


2 Answers

Only producers should close channels. You could achieve your goal by invoking consumer(s) which iterates (range) over the resulting channel once your producers were started. In your main thread you wait (see sync.WaitGroup) until your consumers/producers finished their work. After producers finished you close the resulting channel which will force your consumers to exit (range will exit when channels are closed and no buffered item is left).

Example code:

package main

import (
    "log"
    "sync"
    "time"
    "math/rand"
    "runtime"
)

func consumer() {
    defer consumer_wg.Done()

    for item := range resultingChannel {
        log.Println("Consumed:", item)
    }
}

func producer() {
    defer producer_wg.Done()

    success := rand.Float32() > 0.5
    if success {
        resultingChannel <- rand.Int()
    }
}

var resultingChannel = make(chan int)
var producer_wg sync.WaitGroup
var consumer_wg sync.WaitGroup

func main() {
    rand.Seed(time.Now().Unix())

    for c := 0; c < runtime.NumCPU(); c++ {
        producer_wg.Add(1)  
        go producer()
    }

    for c := 0; c < runtime.NumCPU(); c++ {
        consumer_wg.Add(1)
        go consumer()
    }

    producer_wg.Wait()

    close(resultingChannel)

    consumer_wg.Wait()
}

The reason I put the close-statement into the main function is because we have more than one producer. Closing the channel in one producer in the example above would lead to the problem you already ran into (writing on closed channels; the reason is that there could one producer left who still produces data). Channels should only be closed when there is no producer left (therefore my suggestion on closing the channel only by the producer). This is how channels are constructed in Go. Here you'll find some more information on closing channels.


Related to the sharemem example: AFAICS this example runs endless by re-queuing the Resources again and again (from pending -> complete -> pending -> complete... and so on). This is what the iteration at the end of the main-func does. It receives the completed Resources and re-queues them using Resource.Sleep() to pending. When there is no completed Resource it waits and blocks for new Resources being completed. Therefore there is no need to close the channels because they are in use all the time.

like image 127
fls0815 Avatar answered Oct 21 '22 16:10

fls0815


There are always lots of ways to solve these problems. Here's a solution using the simple synchronous channels that are fundamental in Go. No buffered channels, no closing channels, no WaitGroups.

It's really not that far from your "mouthful" solution, and--sorry to disappoint--not that much smaller. It does put the consumer in it's own goroutine, so that the consumer can consume numbers as the producer produces them. It also makes the distinction that a production "try" can end in either success or failure. If production fails, the try is done immediately. If it succeeds, the try is not done until the number is consumed.

package main

import (
    "fmt"
    "math/rand"
)

func producer(c chan int, fail chan bool) {
    if success := rand.Float32() > 0.5; success {
        c <- rand.Int()
    } else {
        fail <- true
    }
}

func consumer(c chan int, success chan bool) {
    for {
        num := <-c
        fmt.Printf("Producer produced: %d\n", num)
        success <- true
    }
}

func main() {
    const nTries = 10
    c := make(chan int)
    done := make(chan bool)
    for i := 0; i < nTries; i++ {
        go producer(c, done)
    }
    go consumer(c, done)

    for i := 0; i < nTries; i++ {
        <-done
    }
    fmt.Println("All done.")
}
like image 1
Sonia Avatar answered Oct 21 '22 17:10

Sonia