Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Event driven pattern in golang

I am using golang to implement a simple event driven worker. It's like this:

  go func() {
        for {
            select {
            case data := <-ch:
                time.Sleep(1)
                someGlobalMap[data.key] = data.value 
            }
        }
    }()

And the main function will create several goroutines, and each of them will do thing like this:

ch <- data
fmt.Println(someGlobalMap[data.key])

As you can see that, because my worker need some time to do the work, I will got nil result in my main function.How can I control this workflow properly?

like image 678
harryz Avatar asked Jun 05 '16 16:06

harryz


1 Answers

EDIT: I may have misread your question, I see that you mention that main will start many producer goroutines. I thought it was many consumer goroutines, and a single producer. Leaving the answer here in case it can be useful for others looking for that pattern, though the bullet points still apply to your case.

So if I understand correctly your use-case, you can't expect to send on a channel and read the results immediately after. You don't know when the worker will process that send, you need to communicate between the goroutines, and that is done with channels. Assuming just calling a function with a return value doesn't work in your scenario, if you really need to send to a worker, then block until you have the result, you could send a channel as part of the data structure, and block-receive on it after the send, i.e.:

resCh := make(chan Result)
ch <- Data{key, value, resCh}
res := <- resCh

But you should probably try to break down the work as a pipeline of independent steps instead, see the blog post that I linked to in the original answer.


Original answer where I thought it was a single producer - multiple consumers/workers pattern:

This is a common pattern for which Go's goroutines and channels semantics are very well suited. There are a few things you need to keep in mind:

  • The main function will not automatically wait for goroutines to finish. If there's nothing else to do in the main, then the program exits and you don't have your results.

  • The global map that you use is not thread-safe. You need to synchronize access via a mutex, but there's a better way - use an output channel for results, which is already synchronized.

  • You can use a for..range over a channel, and you can safely share a channel between multiple goroutines. As we'll see, that makes this pattern quite elegant to write.

Playground: https://play.golang.org/p/WqyZfwldqp

For more on Go pipelines and concurrency patterns, to introduce error handling, early cancellation, etc.: https://blog.golang.org/pipelines

Commented code for the use-case you mention:

// could be a command-line flag, a config, etc.
const numGoros = 10

// Data is a similar data structure to the one mentioned in the question.
type Data struct {
    key   string
    value int
}

func main() {
    var wg sync.WaitGroup

    // create the input channel that sends work to the goroutines
    inch := make(chan Data)
    // create the output channel that sends results back to the main function
    outch := make(chan Data)

    // the WaitGroup keeps track of pending goroutines, you can add numGoros
    // right away if you know how many will be started, otherwise do .Add(1)
    // each time before starting a worker goroutine.
    wg.Add(numGoros)
    for i := 0; i < numGoros; i++ {
        // because it uses a closure, it could've used inch and outch automaticaly,
        // but if the func gets bigger you may want to extract it to a named function,
        // and I wanted to show the directed channel types: within that function, you
        // can only receive from inch, and only send (and close) to outch.
        //
        // It also receives the index i, just for fun so it can set the goroutines'
        // index as key in the results, to show that it was processed by different
        // goroutines. Also, big gotcha: do not capture a for-loop iteration variable
        // in a closure, pass it as argument, otherwise it very likely won't do what
        // you expect.
        go func(i int, inch <-chan Data, outch chan<- Data) {
            // make sure WaitGroup.Done is called on exit, so Wait unblocks
            // eventually.
            defer wg.Done()

            // range over a channel gets the next value to process, safe to share
            // concurrently between all goroutines. It exits the for loop once
            // the channel is closed and drained, so wg.Done will be called once
            // ch is closed.
            for data := range inch {
                // process the data...
                time.Sleep(10 * time.Millisecond)
                outch <- Data{strconv.Itoa(i), data.value}
            }
        }(i, inch, outch)
    }

    // start the goroutine that prints the results, use a separate WaitGroup to track
    // it (could also have used a "done" channel but the for-loop would be more complex, with a select).
    var wgResults sync.WaitGroup
    wgResults.Add(1)
    go func(ch <-chan Data) {
        defer wgResults.Done()

        // to prove it processed everything, keep a counter and print it on exit
        var n int
        for data := range ch {
            fmt.Println(data.key, data.value)
            n++
        }

        // for fun, try commenting out the wgResults.Wait() call at the end, the output
        // will likely miss this line.
        fmt.Println(">>> Processed: ", n)
    }(outch)

    // send work, wherever that comes from...
    for i := 0; i < 1000; i++ {
        inch <- Data{"main", i}
    }

    // when there's no more work to send, close the inch, so the goroutines will begin
    // draining it and exit once all values have been processed.
    close(inch)

    // wait for all goroutines to exit
    wg.Wait()

    // at this point, no more results will be written to outch, close it to signal
    // to the results goroutine that it can terminate.
    close(outch)

    // and wait for the results goroutine to actually exit, otherwise the program would
    // possibly terminate without printing the last few values.
    wgResults.Wait()
}

In real-life scenarios, where the amount of work is not known ahead of time, the closing of the in-channel could come from e.g. a SIGINT signal. Just make sure no code path can send work after the channel was closed as that would panic.

like image 92
mna Avatar answered Oct 13 '22 09:10

mna