Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use a goroutine pool

I want to use Go for downloading stock price spreadsheets from Yahoo finance. I'll be making an http request for every stock in its own goroutine. I have a list of around 2500 symbols, but instead of making 2500 requests in parallel, I'd prefer making 250 at a time. In Java I'd create a thread pool and reuse threads as and when they get free. I was trying to find something similar, a goroutine pool, if you will, but was unable to find any resources. I'd appreciate if someone can tell me how to accomplish the task at hand or point me to resources for the same. Thanks!

like image 513
tldr Avatar asked Aug 16 '13 06:08

tldr


People also ask

How do I use Goroutine channel?

If goroutines are the activities of a concurrent Go program, channels are the connections between them. A channel is a communication mechanism that enables one goroutine to send values to another goroutine. Each channel is a conduit for values of a particular type, called the channel's element type.

Is Goroutine pool necessary?

No. Goroutines are cheap to create. There is no need to pool like them threads.

How do you know when a Goroutine is going to stop?

One goroutine can't forcibly stop another. To make a goroutine stoppable, let it listen for a stop signal on a dedicated quit channel, and check this channel at suitable points in your goroutine.

What is the use of Goroutine?

Golang provides goroutines to support concurrency in Go. A goroutine is a function that executes simultaneously with other goroutines in a program and are lightweight threads managed by Go. A goroutine takes about 2kB of stack space to initialize.


2 Answers

The simplest way, I suppose, is to create 250 goroutines and pass them a channel which you can use to pass links from main goroutine to child ones, listening that channel.

When all links are passed to goroutines, you close a channel and all goroutines just finish their jobs.

To secure yourself from main goroutine get finished before children process data, you can use sync.WaitGroup.

Here is some code to illustrate (not a final working version but shows the point) that I told above:

func worker(linkChan chan string, wg *sync.WaitGroup) {    // Decreasing internal counter for wait-group as soon as goroutine finishes    defer wg.Done()     for url := range linkChan {      // Analyze value and do the job here    } }  func main() {     lCh := make(chan string)     wg := new(sync.WaitGroup)      // Adding routines to workgroup and running then     for i := 0; i < 250; i++ {         wg.Add(1)         go worker(lCh, wg)     }      // Processing all links by spreading them to `free` goroutines     for _, link := range yourLinksSlice {         lCh <- link     }      // Closing channel (waiting in goroutines won't continue any more)     close(lCh)      // Waiting for all goroutines to finish (otherwise they die as main routine dies)     wg.Wait() } 
like image 170
Rostyslav Dzinko Avatar answered Sep 20 '22 03:09

Rostyslav Dzinko


You can use the thread pool implementation library in Go from this git repo

Here is the nice blog about how to use the channels as thread pool

Snippet from the blog

    var (  MaxWorker = os.Getenv("MAX_WORKERS")  MaxQueue  = os.Getenv("MAX_QUEUE") )  //Job represents the job to be run type Job struct {     Payload Payload }  // A buffered channel that we can send work requests on. var JobQueue chan Job  // Worker represents the worker that executes the job type Worker struct {     WorkerPool  chan chan Job     JobChannel  chan Job     quit        chan bool }  func NewWorker(workerPool chan chan Job) Worker {     return Worker{         WorkerPool: workerPool,         JobChannel: make(chan Job),         quit:       make(chan bool)} }  // Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() {     go func() {         for {             // register the current worker into the worker queue.             w.WorkerPool <- w.JobChannel              select {             case job := <-w.JobChannel:                 // we have received a work request.                 if err := job.Payload.UploadToS3(); err != nil {                     log.Errorf("Error uploading to S3: %s", err.Error())                 }              case <-w.quit:                 // we have received a signal to stop                 return             }         }     }() }  // Stop signals the worker to stop listening for work requests. func (w Worker) Stop() {     go func() {         w.quit <- true     }() }  
like image 33
Shettyh Avatar answered Sep 22 '22 03:09

Shettyh